DIAERESIS: RDF Data Partitioning and Query Processing on SPARK

Tracking #: 3446-4660

Authors: 
Georgia Troullinou
Giannis Agathangelos
Haridimos Kondylakis
Kostas Stefanidis
Dimitris Plexousakis

Responsible editor: 
Aidan Hogan

Submission type: 
Full Paper
Abstract: 
The explosion of the web and the abundance of linked data demand effective and efficient methods for storage, management, and querying. Apache Spark is one of the most widely used engines for big data processing, with more and more systems adopting it for efficient query answering. Existing approaches exploiting Spark for querying RDF data, adopt partitioning techniques for reducing the data that need to be accessed in order to improve efficiency. However, simplistic data partitioning fails, on one hand, to minimize data access and on the other hand to group data usually queried together. This is translated into limited improvement in terms of efficiency in query answering. In this paper, we present DIAERESIS, a novel platform that accepts as input an RDF dataset and effectively partitions it, minimizing data access and improving query answering efficiency. To achieve this, DIAERESIS first identifies the top-k most important schema nodes, i.e., the most important classes, as centroids and distributes the other schema nodes to the centroid they mostly depend on. Then, it allocates the corresponding instance nodes to the schema nodes they are instantiated under. Our algorithm enables fine-tuning of data distribution, significantly reducing data access for query answering. We experimentally evaluate our approach using both synthetic and real workloads, strictly dominating existing state-of-the-art, showing that we improve query answering in several cases by orders of magnitude.
Full PDF Version: 
Tags: 
Reviewed

Decision/Status: 
Major Revision

Solicited Reviews:
Click to Expand/Collapse
Review #1
Anonymous submitted on 28/May/2023
Suggestion:
Accept
Review Comment:

I am happy that almost all my points were addressed entirely. Thus, I vote for acceptance, but I have two further requests.

- the questions about SPARQL-to-SQL translation have only been partially answered. However, little is said about the process. I think a paragraph that describes such a translation is necessary. This links back to section 2.2.. The authors did an excellent job of improving the background. What is missing is a complete characterisation of what queries are supported by you approach. I believe are conjunctive SPARQL queries with no path expressions.

- Theorem 1 and 2 are two great additions to the work. I believe that the relation between k and replication requires a corollary for what concern the explanation of domain/range.
<>

I think the two findings should be highlighted more in the evaluation results with a dedicated figure that shows the trend of avg storage and a number of triples wrt increasing values of k. Essentially extending Table 4 to show empirical evidence of the theorems.

Review #2
Anonymous submitted on 01/Aug/2023
Suggestion:
Accept
Review Comment:

This manuscript was submitted as 'full paper' and should be reviewed along the usual dimensions for research contributions which include (1) originality, (2) significance of the results, and (3) quality of writing. Please also assess the data file provided by the authors under “Long-term stable URL for resources”. In particular, assess (A) whether the data file is well organized and in particular contains a README file which makes it easy for you to assess the data, (B) whether the provided resources appear to be complete for replication of experiments, and if not, why, (C) whether the chosen repository, if it is not GitHub, Figshare or Zenodo, is appropriate for long-term repository discoverability, and (4) whether the provided data artifacts are complete. Please refer to the reviewer instructions and the FAQ for further information.

The suggested comments and feedback were taken into consideration and addressed in the revised version of the paper. This paper presents an effective technique for partitioning RDF datasets minimizing data access and improving query efficiency. Extensive experiments are conducted and different sized datasets to compare performance against existing competing systems.

The authors have addressed my comments from previous review cycle. S3QLRDF referred to in the manuscript is the version from 2019. However, further work on S3QLRDF was published in 2022 which is not mentioned. It is understandable that it may be hard to repeat the experiments with a new system, however, at least comparing and providing a discussion would be useful.

Overall, the paper presents an interesting approach for partitioning using centrality measures.

Review #3
By Louis Jachiet submitted on 08/Aug/2023
Suggestion:
Reject
Review Comment:

This paper introduces a new SparkSQL-based SPARQL query evaluator
called DIAERESIS. This system is based on a novel Data Partitioning
scheme that tries to split the graph using information derived from
the schema graph.

The paper is composed to two main parts, a first part explains the
novel partitioning scheme and second part validates the interest of
the approach by running a benchmark comparison with other Spark-based
SPARQL query evaluators, namely S2RDF, SPARQLGX and WORQ.

I think the approach has great qualities:

- extending the vertical partitioning with a partitioning on nodes is
a good idea, especially for a Spark-based method where this
partitioning serves as a sort of index. It is particularly
interested to find partionning schemes that 1/ work well and reduce
the query time and 2/ scale consistently with Apache;

- the approach was actually implemented with the code is available
and I was able to run the code (with some minor difficulties);

- the authors ran a benchmark over several datasets with several
competitors (which is a long and challenging task) and also tried
to ponder the benefit of the method;

The paper has been greatly improved since the first pass but I still
see important problems in its current form. Doing my review I ended up
running the benchmark (on LUBM 10240) and found vastly different
results to the one showned in your paper. Please explain the
differences. Without the complete script for your benchmark, I fear
that the benchmark might have (mistakenly ?) been configured in a way
that very much favors your solution.

===================================================
DETAILED REMARKS
===================================================

First about the benchmark, that I find a bit weak, here is two idea to
improve it:

- use competitors that are not Spark-based. I fully agree that it is
not because Virtuoso (or any other) outperforms you in certain cases
that your method is not interesting, but we need to see a comparison
with up to date and actually used stores;

- use more/other datasets that do take into account the scale of the
data. At the moment you have three datasets: LUBM, SWDF and
DBpedia. LUBM is synthetic and very peculiar, SWDF is very small and
DBpedia is not fully supported by most of your competitors… (not
supported… or you did not gave them enough ressources).

Regarless of those two proposals, you should explain better how
specifically the proposed method improved the performance of your
system. The section 6.3 is a nice addition but maybe you could give an
idea on a specific query how the partionning helps? Like: what is the
chosen partioning on LUBM and how that helps on a particular query?
Also I would like to see the complete benchmark with the number of
partitions set to 10 (the one you used) and also one with the number
of partitions set to 1 so that we can an idea of the performance
benefits of your specific partionning method and the performance
benefits coming from the parquet files and the nice optimization that
Spark gives to SparkSQL (regarding that topic have considered
including Sparklify in your benchmark?).

I don't fully understand the "data reduction" numbers that you
give. How do estimate them? Do you use HDFS metadata to know how much
data is accessed? If so, why is it in M rows? If you do measure rows,
you should state it more clearly and try to assess the amount of data
that corresponds in each store. Also, still regarding Fig. 9, I don't
completely understand how splitting in 10 partitions helps you to
divide by 100 the amount of data touched. One would expect to divide
by 10 (when everything fits in a single partition). You can sometimes
reduce a bit more (if you are lucky and fall into a small partition)
but otherwise it should be bigger? Or is it because your scheme does
something like putting (on LUBM) students and professors in two
different partitions which allows you to be efficient and look only at
professors? What happens, e.g., on Q4 so that you get such a
reduction?

In the benchmark section, you talk about the cache that you seem to
have activated but then you say that you average over 10
executions. If you did activate the cache AND ran each queries 10
times, that would be an unfair avantage to your system (which is why
it would be nice to have the script that you used to do the
benchmark).

You say that SPARQLGX wins in terms of "storage overhead and
preprocessing time", it might be true for the proprecessing time but
it does not seem to be the case for the storage overhead.

What does the following sentence means "For DIAERESIS, we configured
Spark with 12 cores per worker (to achieve a total of 48 cores),
whereas we left the default configuration for other systems"? Did you
make sure that all systems had access to the same number of cores and
workers? (another reason why it would be nice to have the script that
you used to do the benchmark).

I would also like to understand better what happens at the loading
time. In table 3, for LUBM, your method takes 8 min for 13 MTP, 124
min for 173 MTP (so ×15 time for ×13 TP which is reasonable) but then
it takes only 130 min for 266 MTP (a ×3 in TP) and 187 for 1350 (a ×8
TP compared to 173 for a time that is less than ×2). Do you have any
idea?

In the DBpedia benchmark, the queries are very simple: you have 67
queries with a single TP and a single variable, 7 with two TPs, 9 with
three, 25 with seven and 4 with eight. There is only 4 queries with
four variables (and not more), etc.

Regarding your code, cloning it gives different rights on the
different scripts (run_dap and run_optimizer are not executable
whereas run_query and run_translator are). There is not the script you
used to benchmark your competitor. DIARESIS needs to be modified
before being launched. It is quite normal to have a configuration file
to edit but here we need to edit the 4 scripts… and it is not entirely
clear to what value. I also ended up changing the number of executors
as your system was not fully utilizing my cluster (both at loading and
quering).

When running run_dap.sh to load the data on the big LUBM dataset I received
errors ofthe form "TaskCommitDenied (Driver denied task commit)" which is
generally because of large joins but the system ended up loading correctly
the data (at least it seems to).

In your code you have a script to "translate" and one to "execute" and
in the benchmark you say that you measure the execution time (and do
not mention the translation time). I would imagine that the
translation time is not very high but in your benchmark you end up
with some running times in the order of 1/10th of a second. Question
is: do you measure translation time? If not, I would like to see the
average time to translate a query. On my cluster, despite similar
loading time, translating one query took 38s, much more than all your
queries in benchmark. Of course one could argue that it makes sense to
"pre-translate" and only compare running time, but I wonder whether
you did the same for the other stores? SPARQLGX for instance has the
same behaviour (translation and then query) and I don't think you
distinguish the two.

When running your code to load LUBM10240 with 10 partition (as
specified in the paper) I obtain a file size of 117.9Go. I understand
that there might be flucutation in the size I loaded the dataset
twice, and twice I got the same size. How is it possible that you got
44.32 GB while I get 117.9GB? Here is the content of the folder on
HDFS:

23.1 M lubm_data/classSubPart_index_10__00000
1.3 G lubm_data/classSubPart_index_10__00001
664.0 M lubm_data/classSubPart_index_10__00002
788.7 M lubm_data/classSubPart_index_10__00003
921.9 M lubm_data/classSubPart_index_10__00005
897.0 M lubm_data/classSubPart_index_10__00006
1.0 G lubm_data/classSubPart_index_10__00007
1.1 G lubm_data/classSubPart_index_10__00008
271.0 M lubm_data/classSubPart_index_10__00009
97.0 G lubm_data/cluster
7.8 G lubm_data/instance_edges
3.9 G lubm_data/instance_vertices
2.3 G lubm_data/node_index_10
12.0 K lubm_data/schema_cc
4.6 K lubm_data/schema_edges
2.3 K lubm_data/schema_vertices

And here is the content of cluster/clusters_10_bal

95.8 M 95.8 M lubm_data/cluster/clusters_10_bal/00000
4.0 G 4.0 G lubm_data/cluster/clusters_10_bal/00001
2.8 G 2.8 G lubm_data/cluster/clusters_10_bal/00002
12.8 G 12.8 G lubm_data/cluster/clusters_10_bal/00003
6.0 G 6.0 G lubm_data/cluster/clusters_10_bal/00005
3.6 G 3.6 G lubm_data/cluster/clusters_10_bal/00006
6.8 G 6.8 G lubm_data/cluster/clusters_10_bal/00007
7.7 G 7.7 G lubm_data/cluster/clusters_10_bal/00008
676.5 M 676.5 M lubm_data/cluster/clusters_10_bal/00009
0 0 lubm_data/cluster/clusters_10_bal/_SUCCESS
111.7 M 111.7 M lubm_data/cluster/clusters_10_bal/part-00000-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
4.7 G 4.7 G lubm_data/cluster/clusters_10_bal/part-00001-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
3.5 G 3.5 G lubm_data/cluster/clusters_10_bal/part-00002-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
15.0 G 15.0 G lubm_data/cluster/clusters_10_bal/part-00003-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
7.5 G 7.5 G lubm_data/cluster/clusters_10_bal/part-00005-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
4.5 G 4.5 G lubm_data/cluster/clusters_10_bal/part-00006-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
7.6 G 7.6 G lubm_data/cluster/clusters_10_bal/part-00007-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
8.9 G 8.9 G lubm_data/cluster/clusters_10_bal/part-00008-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
748.2 M 748.2 M lubm_data/cluster/clusters_10_bal/part-00009-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet

Another important difference between what you claim and what I
observed is the query time. Just after loading, I ran the query Q4
with your script run_query.sh (which I previously translated) and it
took 3 min to run Q4 whereas, in your paper, you claim a running time
of under a 1s. I tried running it a second time, and it went slightly
faster (92s) but still an order of magnitude slower than what you
claim. I ran it a third time and it took 119s. Your own program
computes a time (which was 23087 the first time and 22013 the second
time and 4349 the third, all in ms) but this time is misleading: you
deliberately leave some part of the computation outside this time
(for instance you have a function preloadTables) but even taking into
account your own time, this is 22 times (or 4× if we count the third
attempt) slower than what you claim. The computation time on spark (so
not the time to submit or anything) was 3 min… Looking at the code we
have in QueryProcessor.scala:

val result = executeFinalQuery(queryMap)
var t1 = System.nanoTime()
result.count//.write.csv(Constants.HDFS + dataset + "/result_" + subPartitionMode +"/" + qName)
var t2 = System.nanoTime()

How did you actually compute the time? Did you remove the loading
time? Did you compute the queries in batch? If so did you do it for
all stores? Here is what was shown in the Spark interface for the
third iteration seconds before the script ended
https://docdro.id/Foy1hQS For reference, loading on my server took
3h30, hence slightly slower but similar to yours.

I also ran an experiment with the full LUBM set of
queries. Translating all of them took 50s (that is an improvement in
per query time) and running them in batch (so not cold system) took 21
minutes which is quite far from the shown numbers. According to Table
4, the running time should be 117s but 21 minutes is 1260s so a ×10
increase in time, to run the full batch (despite the fact that the
system benefited from running the queries in batch with caching and so
on).

Of course, we have different systems so it is normal to not have the same
running time but we also have not the same size and the time differ too much.

===================================================
MORE MINOR REMARKS
===================================================

Def 2, line 47 instance -> instance node

Def 3, path is a function? I don't understand this notation as there
might be several (shortest or not) paths from v1 to v2.

Page 4, line 22, SPARQL is much more than just BGP+filter+Opt.

Page 8, line 20, I don't like the notation normal(BC(v)) maybe use
normalBC(v)? I don't why put a "u" in the min(BC(u)) and add u ∈ V_s
next to the fraction. If it is the min over the whole graph, you can
put min_{u∈V_s}(BC(u)) but that is a long notation, or just minBC and
define it in the text afterwards.

Page 8, line 28, why norma and not normal ?

Page 8, line 33 could you elaborate on why it is important to
"equalize" the importance of normalBC and normalInst? Why is it
normalBC + normalInst and not normalBC + 2 normalInst?

Page 8, line 38 "the number of instance nodes of _each schema_ nodes"

Page 8, 4.2 you talk about partitions but they have not been defined at this point.

Page 9, def 6 you define DistinctsObjects(p(v_k,v_m)) as the number of
distinct v_m such that ... It is weird, and if
DistinctsObjects(p(v_k,v_m)) does not depend on v_m just define
DistinctsObjects(p, v_k)?

Page 9, def 7, the v_i have not been defined. I suppose this is the
path from v_s to v_e but what happens when the path is not unique?

Page 11, V_J should be V_j at line 12, no?

The math notation of Theorem 1 does not really make sense to me. You
seem to define V_1 ... V_k as the partitionning with k then you
compute Σ triple(V_i) for 1 ≤ i ≤ k and then you look at Σ
triples(V_j) for 1 ≤ j ≤ k+1. I see that text below says that V_j is
in DAP(V,k+1) and I think I understand what you want to say but
mathematically, what you say is broken. You should define |DAP(V,k)| =
Σ triples(V_i) and then say |DAP(V,k)| ≤ |DAP(V,k+1)|. Also, to me, it
feels a bit pompous to say that this is a Theorem.

Once again, you define V_1 ... V_k then use V_i and V_j and I am not
sure what you mean in the statement of Theorem 2. According to the
text below the theorem it should say that the amount of data stored in
each vertical subpartition decrease (on average) when k
increases. Maybe I missed something but I think that this might be
true for i=j (which is not what is stated) and not otherwise. Imagine
a graph with 4 main nodes A-B-B'-C and 4n other nodes: the first 2n
nodes are connected to A and C with 2n distinct predicates, the 2n
next nodes are connected to B or B' with the same predicate P. In a
partitionning where k=2 and we have one partition with A and B and one
with B' and C (we don't care where B'' is) then we have one edge
between the two clusters and the average size of vertical subpartition
is 2 (n partitions of size 1 + one partition of size n). When moving
to three partition we will select B' or B as the new centroid and it
will have an average of n elements per vertical subpartition.