Review Comment:
This paper introduces a new SparkSQL-based SPARQL query evaluator
called DIAERESIS. This system is based on a novel Data Partitioning
scheme that tries to split the graph using information derived from
the schema graph.
The paper is composed to two main parts, a first part explains the
novel partitioning scheme and second part validates the interest of
the approach by running a benchmark comparison with other Spark-based
SPARQL query evaluators, namely S2RDF, SPARQLGX and WORQ.
I think the approach has great qualities:
- extending the vertical partitioning with a partitioning on nodes is
a good idea, especially for a Spark-based method where this
partitioning serves as a sort of index. It is particularly
interested to find partionning schemes that 1/ work well and reduce
the query time and 2/ scale consistently with Apache;
- the approach was actually implemented with the code is available
and I was able to run the code (with some minor difficulties);
- the authors ran a benchmark over several datasets with several
competitors (which is a long and challenging task) and also tried
to ponder the benefit of the method;
The paper has been greatly improved since the first pass but I still
see important problems in its current form. Doing my review I ended up
running the benchmark (on LUBM 10240) and found vastly different
results to the one showned in your paper. Please explain the
differences. Without the complete script for your benchmark, I fear
that the benchmark might have (mistakenly ?) been configured in a way
that very much favors your solution.
===================================================
DETAILED REMARKS
===================================================
First about the benchmark, that I find a bit weak, here is two idea to
improve it:
- use competitors that are not Spark-based. I fully agree that it is
not because Virtuoso (or any other) outperforms you in certain cases
that your method is not interesting, but we need to see a comparison
with up to date and actually used stores;
- use more/other datasets that do take into account the scale of the
data. At the moment you have three datasets: LUBM, SWDF and
DBpedia. LUBM is synthetic and very peculiar, SWDF is very small and
DBpedia is not fully supported by most of your competitors… (not
supported… or you did not gave them enough ressources).
Regarless of those two proposals, you should explain better how
specifically the proposed method improved the performance of your
system. The section 6.3 is a nice addition but maybe you could give an
idea on a specific query how the partionning helps? Like: what is the
chosen partioning on LUBM and how that helps on a particular query?
Also I would like to see the complete benchmark with the number of
partitions set to 10 (the one you used) and also one with the number
of partitions set to 1 so that we can an idea of the performance
benefits of your specific partionning method and the performance
benefits coming from the parquet files and the nice optimization that
Spark gives to SparkSQL (regarding that topic have considered
including Sparklify in your benchmark?).
I don't fully understand the "data reduction" numbers that you
give. How do estimate them? Do you use HDFS metadata to know how much
data is accessed? If so, why is it in M rows? If you do measure rows,
you should state it more clearly and try to assess the amount of data
that corresponds in each store. Also, still regarding Fig. 9, I don't
completely understand how splitting in 10 partitions helps you to
divide by 100 the amount of data touched. One would expect to divide
by 10 (when everything fits in a single partition). You can sometimes
reduce a bit more (if you are lucky and fall into a small partition)
but otherwise it should be bigger? Or is it because your scheme does
something like putting (on LUBM) students and professors in two
different partitions which allows you to be efficient and look only at
professors? What happens, e.g., on Q4 so that you get such a
reduction?
In the benchmark section, you talk about the cache that you seem to
have activated but then you say that you average over 10
executions. If you did activate the cache AND ran each queries 10
times, that would be an unfair avantage to your system (which is why
it would be nice to have the script that you used to do the
benchmark).
You say that SPARQLGX wins in terms of "storage overhead and
preprocessing time", it might be true for the proprecessing time but
it does not seem to be the case for the storage overhead.
What does the following sentence means "For DIAERESIS, we configured
Spark with 12 cores per worker (to achieve a total of 48 cores),
whereas we left the default configuration for other systems"? Did you
make sure that all systems had access to the same number of cores and
workers? (another reason why it would be nice to have the script that
you used to do the benchmark).
I would also like to understand better what happens at the loading
time. In table 3, for LUBM, your method takes 8 min for 13 MTP, 124
min for 173 MTP (so ×15 time for ×13 TP which is reasonable) but then
it takes only 130 min for 266 MTP (a ×3 in TP) and 187 for 1350 (a ×8
TP compared to 173 for a time that is less than ×2). Do you have any
idea?
In the DBpedia benchmark, the queries are very simple: you have 67
queries with a single TP and a single variable, 7 with two TPs, 9 with
three, 25 with seven and 4 with eight. There is only 4 queries with
four variables (and not more), etc.
Regarding your code, cloning it gives different rights on the
different scripts (run_dap and run_optimizer are not executable
whereas run_query and run_translator are). There is not the script you
used to benchmark your competitor. DIARESIS needs to be modified
before being launched. It is quite normal to have a configuration file
to edit but here we need to edit the 4 scripts… and it is not entirely
clear to what value. I also ended up changing the number of executors
as your system was not fully utilizing my cluster (both at loading and
quering).
When running run_dap.sh to load the data on the big LUBM dataset I received
errors ofthe form "TaskCommitDenied (Driver denied task commit)" which is
generally because of large joins but the system ended up loading correctly
the data (at least it seems to).
In your code you have a script to "translate" and one to "execute" and
in the benchmark you say that you measure the execution time (and do
not mention the translation time). I would imagine that the
translation time is not very high but in your benchmark you end up
with some running times in the order of 1/10th of a second. Question
is: do you measure translation time? If not, I would like to see the
average time to translate a query. On my cluster, despite similar
loading time, translating one query took 38s, much more than all your
queries in benchmark. Of course one could argue that it makes sense to
"pre-translate" and only compare running time, but I wonder whether
you did the same for the other stores? SPARQLGX for instance has the
same behaviour (translation and then query) and I don't think you
distinguish the two.
When running your code to load LUBM10240 with 10 partition (as
specified in the paper) I obtain a file size of 117.9Go. I understand
that there might be flucutation in the size I loaded the dataset
twice, and twice I got the same size. How is it possible that you got
44.32 GB while I get 117.9GB? Here is the content of the folder on
HDFS:
23.1 M lubm_data/classSubPart_index_10__00000
1.3 G lubm_data/classSubPart_index_10__00001
664.0 M lubm_data/classSubPart_index_10__00002
788.7 M lubm_data/classSubPart_index_10__00003
921.9 M lubm_data/classSubPart_index_10__00005
897.0 M lubm_data/classSubPart_index_10__00006
1.0 G lubm_data/classSubPart_index_10__00007
1.1 G lubm_data/classSubPart_index_10__00008
271.0 M lubm_data/classSubPart_index_10__00009
97.0 G lubm_data/cluster
7.8 G lubm_data/instance_edges
3.9 G lubm_data/instance_vertices
2.3 G lubm_data/node_index_10
12.0 K lubm_data/schema_cc
4.6 K lubm_data/schema_edges
2.3 K lubm_data/schema_vertices
And here is the content of cluster/clusters_10_bal
95.8 M 95.8 M lubm_data/cluster/clusters_10_bal/00000
4.0 G 4.0 G lubm_data/cluster/clusters_10_bal/00001
2.8 G 2.8 G lubm_data/cluster/clusters_10_bal/00002
12.8 G 12.8 G lubm_data/cluster/clusters_10_bal/00003
6.0 G 6.0 G lubm_data/cluster/clusters_10_bal/00005
3.6 G 3.6 G lubm_data/cluster/clusters_10_bal/00006
6.8 G 6.8 G lubm_data/cluster/clusters_10_bal/00007
7.7 G 7.7 G lubm_data/cluster/clusters_10_bal/00008
676.5 M 676.5 M lubm_data/cluster/clusters_10_bal/00009
0 0 lubm_data/cluster/clusters_10_bal/_SUCCESS
111.7 M 111.7 M lubm_data/cluster/clusters_10_bal/part-00000-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
4.7 G 4.7 G lubm_data/cluster/clusters_10_bal/part-00001-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
3.5 G 3.5 G lubm_data/cluster/clusters_10_bal/part-00002-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
15.0 G 15.0 G lubm_data/cluster/clusters_10_bal/part-00003-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
7.5 G 7.5 G lubm_data/cluster/clusters_10_bal/part-00005-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
4.5 G 4.5 G lubm_data/cluster/clusters_10_bal/part-00006-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
7.6 G 7.6 G lubm_data/cluster/clusters_10_bal/part-00007-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
8.9 G 8.9 G lubm_data/cluster/clusters_10_bal/part-00008-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
748.2 M 748.2 M lubm_data/cluster/clusters_10_bal/part-00009-a4cbfc49-bccb-4ba8-99d3-ec8f02fbf8b6-c000.snappy.parquet
Another important difference between what you claim and what I
observed is the query time. Just after loading, I ran the query Q4
with your script run_query.sh (which I previously translated) and it
took 3 min to run Q4 whereas, in your paper, you claim a running time
of under a 1s. I tried running it a second time, and it went slightly
faster (92s) but still an order of magnitude slower than what you
claim. I ran it a third time and it took 119s. Your own program
computes a time (which was 23087 the first time and 22013 the second
time and 4349 the third, all in ms) but this time is misleading: you
deliberately leave some part of the computation outside this time
(for instance you have a function preloadTables) but even taking into
account your own time, this is 22 times (or 4× if we count the third
attempt) slower than what you claim. The computation time on spark (so
not the time to submit or anything) was 3 min… Looking at the code we
have in QueryProcessor.scala:
val result = executeFinalQuery(queryMap)
var t1 = System.nanoTime()
result.count//.write.csv(Constants.HDFS + dataset + "/result_" + subPartitionMode +"/" + qName)
var t2 = System.nanoTime()
How did you actually compute the time? Did you remove the loading
time? Did you compute the queries in batch? If so did you do it for
all stores? Here is what was shown in the Spark interface for the
third iteration seconds before the script ended
https://docdro.id/Foy1hQS For reference, loading on my server took
3h30, hence slightly slower but similar to yours.
I also ran an experiment with the full LUBM set of
queries. Translating all of them took 50s (that is an improvement in
per query time) and running them in batch (so not cold system) took 21
minutes which is quite far from the shown numbers. According to Table
4, the running time should be 117s but 21 minutes is 1260s so a ×10
increase in time, to run the full batch (despite the fact that the
system benefited from running the queries in batch with caching and so
on).
Of course, we have different systems so it is normal to not have the same
running time but we also have not the same size and the time differ too much.
===================================================
MORE MINOR REMARKS
===================================================
Def 2, line 47 instance -> instance node
Def 3, path is a function? I don't understand this notation as there
might be several (shortest or not) paths from v1 to v2.
Page 4, line 22, SPARQL is much more than just BGP+filter+Opt.
Page 8, line 20, I don't like the notation normal(BC(v)) maybe use
normalBC(v)? I don't why put a "u" in the min(BC(u)) and add u ∈ V_s
next to the fraction. If it is the min over the whole graph, you can
put min_{u∈V_s}(BC(u)) but that is a long notation, or just minBC and
define it in the text afterwards.
Page 8, line 28, why norma and not normal ?
Page 8, line 33 could you elaborate on why it is important to
"equalize" the importance of normalBC and normalInst? Why is it
normalBC + normalInst and not normalBC + 2 normalInst?
Page 8, line 38 "the number of instance nodes of _each schema_ nodes"
Page 8, 4.2 you talk about partitions but they have not been defined at this point.
Page 9, def 6 you define DistinctsObjects(p(v_k,v_m)) as the number of
distinct v_m such that ... It is weird, and if
DistinctsObjects(p(v_k,v_m)) does not depend on v_m just define
DistinctsObjects(p, v_k)?
Page 9, def 7, the v_i have not been defined. I suppose this is the
path from v_s to v_e but what happens when the path is not unique?
Page 11, V_J should be V_j at line 12, no?
The math notation of Theorem 1 does not really make sense to me. You
seem to define V_1 ... V_k as the partitionning with k then you
compute Σ triple(V_i) for 1 ≤ i ≤ k and then you look at Σ
triples(V_j) for 1 ≤ j ≤ k+1. I see that text below says that V_j is
in DAP(V,k+1) and I think I understand what you want to say but
mathematically, what you say is broken. You should define |DAP(V,k)| =
Σ triples(V_i) and then say |DAP(V,k)| ≤ |DAP(V,k+1)|. Also, to me, it
feels a bit pompous to say that this is a Theorem.
Once again, you define V_1 ... V_k then use V_i and V_j and I am not
sure what you mean in the statement of Theorem 2. According to the
text below the theorem it should say that the amount of data stored in
each vertical subpartition decrease (on average) when k
increases. Maybe I missed something but I think that this might be
true for i=j (which is not what is stated) and not otherwise. Imagine
a graph with 4 main nodes A-B-B'-C and 4n other nodes: the first 2n
nodes are connected to A and C with 2n distinct predicates, the 2n
next nodes are connected to B or B' with the same predicate P. In a
partitionning where k=2 and we have one partition with A and B and one
with B' and C (we don't care where B'' is) then we have one edge
between the two clusters and the average size of vertical subpartition
is 2 (n partitions of size 1 + one partition of size n). When moving
to three partition we will select B' or B as the new centroid and it
will have an average of n elements per vertical subpartition.
|