08-01-2013, 10:57 AM
Scalable Scheduling of Updates in Streaming Data Warehouses
Scalable Scheduling.pdf (Size: 883.57 KB / Downloads: 35)
Abstract
We discuss update scheduling in streaming data warehouses, which combine the features of traditional data warehouses
and data stream systems. In our setting, external sources push append-only data streams into the warehouse with a wide range of
interarrival times. While traditional data warehouses are typically refreshed during downtimes, streaming warehouses are updated as
new data arrive. We model the streaming warehouse update problem as a scheduling problem, where jobs correspond to processes
that load new data into tables, and whose objective is to minimize data staleness over time (at time t, if a table has been updated with
information up to some earlier time r, its staleness is t minus r). We then propose a scheduling framework that handles the
complications encountered by a stream warehouse: view hierarchies and priorities, data consistency, inability to preempt updates,
heterogeneity of update jobs caused by different interarrival times and data volumes among different sources, and transient overload.
A novel feature of our framework is that scheduling decisions do not depend on properties of update jobs (such as deadlines), but
rather on the effect of update jobs on data staleness. Finally, we present a suite of update scheduling algorithms and extensive
simulation experiments to map out factors which affect their performance.
INTRODUCTION
TRADITIONAL data warehouses are updated during downtimes
[25] and store layers of complex materialized
views over terabytes of historical data. On the other hand,
Data Stream Management Systems (DSMS) support simple
analyses on recently arrived data in real time. Streaming
warehouses such as DataDepot [15] combine the features of
these two systems by maintaining a unified view of current
and historical data. This enables a real-time decision
support for business-critical applications that receive
streams of append-only data from external sources.
Applications include:
. Online stock trading, where recent transactions
generated by multiple stock exchanges are compared
against historical trends in nearly real time to
identify profit opportunities;
. Credit card or telephone fraud detection, where
streams of point-of-sale transactions or call details
are collected in nearly real time and compared with
past customer behavior;
Scheduling Challenges
Real-time scheduling is a well-studied topic with a lengthy
literature [7]. However, our problem introduces unique
challenges that must be simultaneously dealt with a
streaming warehouse.
Scheduling metric. Many metrics have been considered
in the real-time scheduling literature. In a typical hard realtime
system, jobs must be completed before their deadlines—
a simple metric to understand and to prove results
about. In a firm real-time system, jobs can miss their
deadlines, and if they do, they are discarded. The performance
metric in a firm real-time system is the fraction of jobs
that meet their deadlines. However, a streaming warehouse
must load all of the data that arrive; therefore no updates can
be discarded. In a soft real-time system, late jobs are allowed
to stay in the system, and the performance metric is lateness
(or tardiness), which is the difference between the completion
times of late jobs and their deadlines. However, we are
not concerned about properties of the update jobs. Instead,
we will define a scheduling metric in terms of data staleness,
roughly defined as the difference between the current time
and the time stamp of the most recent record in a table.
SYSTEM MODEL
Streaming Warehouse Architecture
Table 1 lists the symbols used in this paper. Fig. 1 illustrates a
streaming data warehouse. Each data stream i is generated
by an external source, with a batch of new data, consisting of
one or more records, being pushed to the warehouse with
period Pi. If the period of a stream is unknown or
unpredictable, we let the user choose a period with which
the warehouse should check for new data. Examples of
streams collected by an Internet Service Provider include
router performance statistics such asCPUusage, system logs,
routing table updates, link layer alerts, etc. An important
property of the data streams in our motivating applications is
that they are append-only, i.e., existing records are never
modified or deleted. For example, a stream of average router
CPU utilization measurement may consist of records with
fields (time stamp, router_name, CPU_utilization), and a new
data file with updated CPU measurement for each router
may arrive at the warehouse every 5 minutes.
A streaming data warehouse maintains two types of
tables: base and derived. Each table may be stored partially
or wholly on disk. A base table is loaded directly from a
data stream. A derived table is a materialized view defined
as an SQL query over one or more (base or derived) tables.
Each base or derived table Tj has a user-defined priority pj
and a time-dependent staleness function SjðÞ that will be
defined shortly. Relationships among source and derived
tables form a (directed and acyclic) dependency graph. For
each table Tj, we define a set of its ancestor tables as those
which directly or indirectly serve as its sources, and a set of
its dependent tables as those which are directly or indirectly
sourced from Tj. For example, T1, T2 and T3 are ancestors of
T4, and T3 and T4 are dependents of T1.
Warehouse Consistency
Following the previous work on data warehousing, we
want derived tables to reflect the state of their sources as of
some point in time [10], [35]. Suppose that D is derived
from T1 and T2, which were last updated at times 10:00 and
10:05, respectively. If T1 and T2 incur arbitrary insertions,
modifications, and deletions, it may not be possible to
update D such that it is consistent with T1 and T2 as of some
point in time, say, 10:00 (we would have to roll back the
state of T2 to time 10:00, which requires multiversioning).
However, tables in a streaming warehouse are not “snapshots”
of the current state of the data, but rather they collect
all the (append-only) data that have arrived over time (or at
least within a large window of time). Since the data are
append-only, each record has exactly one “version.” For
now, suppose that data arrive in time stamp order. We can
extract the state of T2 as of time 10:00 by selecting records
with time stamps up to and including 10:00. Using these
records, we can update D such that it is consistent with T1
and T2 as of time 10:00.
SCHEDULING ALGORITHMS
This section presents our scheduling framework. The idea is
to partition the update jobs by their expected processing
times, and to partition the available computing resources into
tracks.Atrack logically represents a fraction of the computing
resources required by our complex jobs, including CPU,
memory, and disk I/Os. When an update job is released, it is
placed in the queue corresponding to its assigned partition
(track), where scheduling decisions are made by a local
scheduler running a basic algorithm (however, the algorithm
that we will present in Section 3.2.3 generalizes this
assumption). We assume that each job is executed on exactly
one track, so that tracks become a mechanism for limiting
concurrency and for separating long jobs from short jobs
(with the number of tracks being the limit on the number of
concurrent jobs). For simplicity, we assume that the same
type of basic scheduling algorithm is used for each track.
Job Partitioning
If a job set is heterogeneous with respect to the periods and
execution times (long execution times versus short periods),
scheduler performance is likely to benefit if some fraction of
the processing resources are guaranteed to short jobs
(corresponding to tables that are updated often, which
generally have higher priority). The traditional method for
ensuring resource allocation is to partition the job set and to
schedule each partition separately [7] (and to repartition the
set whenever newtables or sources are added or existing ones
removed, or whenever the parameters of existing jobs change
significantly). However, recent results indicate that global
scheduling (i.e., using a single track to schedule one or more
jobs at a time) provides better performance, especially in a soft
real-time setting, where job lateness needs to be minimized
[5], [12]. In this section, we investigate two methods for
ensuring resources for short jobs while still providing a
degree of global scheduling: EDF-Partitioned and Proportional.
EDF-Partitioned Strategy
The EDF-partitioned algorithm assigns jobs to tracks in a
way that ensures that each track has a feasible nonpreemptive
EDF schedule. A feasible schedule means that if the
local scheduler were to use the EDF algorithm to decide
which job to schedule next, all jobs would meet their
deadlines. In our setting, we assume that the deadline of an
update job is its release time plus its period, i.e., for each
table, we want to load every batch of new data before the
next batch arrives.
Dealing with Transient Overload
During transient overload, low-priority jobs are deferred in
favor of high priority jobs. When the period of transient
overload is over, the low-priority jobs will be scheduled for
execution. Since they have been delayed for a long time,
they will have accumulated a large freshness delta and
therefore a large execution time—and therefore might block
the execution of high-priority jobs. A solution to this
problem is to “chop up” the execution of the jobs that have
accumulated a long freshness delta to a maximum of c times
their period, for some c 1:0. This technique introduces a
degree of preemptibility into long jobs, reducing the
chances of priority inversion (low-priority jobs blocking
high-priority jobs) [7].
CONCLUSIONS
In this paper, we motivated, formalized, and solved the
problem of nonpreemptively scheduling updates in a realtime
streaming warehouse. We proposed the notion of
average staleness as a scheduling metric and presented
scheduling algorithms designed to handle the complex
environment of a streaming data warehouse. We then
proposed a scheduling framework that assigns jobs to
processing tracks and uses basic algorithms to schedule jobs
within a track. The main feature of our framework is the
ability to reserve resources for short jobs that often
correspond to important frequently refreshed tables, while
avoiding the inefficiencies associated with partitioned
scheduling techniques.