СтартФоткиЖелезкиПрограммыШпаргалкиРадиоКонтактСоглашение

Причина

Для организации исторических данных в области анализа данных используются таблицы SCD2 (медленно меняющиеся измерения). Добавление к обычному измерению (дименшену) четырёх полей является достаточным и удобным отображением всех исторических явлений.

Решение

Попробуем решить задачу в базе данных Оракл. Возьмём две исходные таблицы: acc_categories.sql и acc_products.sql

Целевая таблица будет исторической и денормализованной: dim_products_scd.sqlseq_dim_products_scd.sql

Добавим поле LEVEL_CODE, его возможными значениями будут PRODUCT, CATEGORY, TITLE. Если необходимо вывести в выпадающий список фильтра только категорию, то сделаем выборку с фильтром CATEGORY.
Добавим техничкское поле TA_SOURCE, его возможными значениями будут названия источников данных, если их много, например, несколько магазинов.
Добавим поля PRODUCT_SURR_ID, IS_ACTIVE, VALID_FROM, VALID_TO для оргнизации историзации по принципу SCD2. Удаленные записи будем деактивировать, т.е. IS_ACTIVE='N' с датой.

Загрузка исторической таблицы

В целевой схеме выделим три уровня. Подготовим для каждого из них запрос, который покажет уникальные дынные и который станет основой для загрузки.

Категория:

SELECT pr.category_id AS category_id,
       cat.title      AS category_desc
FROM   acc_products pr,
       acc_categories cat
WHERE  pr.category_id = cat.id 

Группа:

SELECT DISTINCT title AS title_desc
FROM   acc_products 

Продукт:

SELECT pr.id          AS product_id,
       pr.name        AS product_code,
       pr.name
       || pr.title    AS product_desc,
       pr.title       AS title_desc,
       pr.category_id AS category_id,
       cat.title      AS category_desc
FROM   acc_products pr,
       acc_categories cat
WHERE  pr.category_id = cat.id 

Процедура загрузки состоит из трех частей: деактивация удаленных записей, обновление существующих и добавление новых.

Эту процедуру нужно будет повторить три раза для каждого уровня иерархии.

Категория. Удаленные и измененные данные.

-- --------------------------------
-- Categories: delete rows
-- --------------------------------
UPDATE dim_products_scd dst
SET    valid_to = To_date ('31.12.9999', 'dd.mm.yyyy'),
       is_active = 'N'
WHERE  dst.product_surr_id IN (
     -- deleted versions
      SELECT olds.product_surr_id AS product_surr_id
          FROM   (SELECT pr.category_id AS category_id,
                         cat.title      AS category_desc
                  FROM   acc_products pr,
                          acc_categories cat
                  WHERE  pr.category_id = cat.id) news,
                  (SELECT product_surr_id,
                          category_id,
                          category_desc
                    FROM   dim_products_scd
                    WHERE  is_active = 'Y'
                            AND level_code = 'CATEGORY') olds
           WHERE  news.category_id (+) = olds.category_id
           AND -- changed columns
           ( Decode (news.category_id, olds.category_id, 0, 1) = 1
           OR Decode (news.category_desc, olds.category_desc, 0, 1) = 1 )); 

Удаленные и измененные записи деактивируем, при этом удаленные можно будет найти по дате 31.12.9999 и is_active=N.
Итак, в таблице есть три независимых уровня, выбираем интересующий нас level_code=CATEGORY.

Натуральным ключом является category_id, по этому полю соединяем таблицы по принципу outer join.
Измененные записи нужно тоже деактивировать, чтобы потом создать им новую версию, поэтому ищем, что изменилось news.category_desc <> olds.category_desc.

Функция Decode в оракл выполняет дополнительно приведение типов. Зачем это нужно? В случае удаления записи в колонках news.category_desc получат значения NULL, так как мы сделали outer join. При прямом сравнении news.category_desc <> olds.category_desc оракл выдаст true.

В данном примере мы удалим категорию, если в ней нет продуктов, т.е. очищаем данные, так как в результате анализа данных оказалось, что в таблице категорий много тестовых записей, которые являются мусором.

Категория. Новые и измененные данные.

-- --------------------------------
-- Categories: new and changed rows
-- --------------------------------
MERGE INTO dim_products_scd dst
USING (
      -- new versions
      SELECT -333               AS product_surr_id,
             news.category_id   AS category_id,
             news.category_desc AS category_desc
      FROM   (SELECT pr.category_id AS category_id,
                     cat.title      AS category_desc
              FROM   acc_products pr,
                     acc_categories cat
              WHERE  pr.category_id = cat.id) news,
             (SELECT product_surr_id,
                     category_id,
                     category_desc
              FROM   dim_products_scd
              WHERE  is_active = 'Y'
                     AND level_code = 'CATEGORY') olds
      WHERE  news.category_id = olds.category_id (+)
        AND -- changed columns
        ( Decode (news.category_id, olds.category_id, 0, 1) = 1
           OR Decode (news.category_desc, olds.category_desc, 0, 1) = 1 )
       UNION ALL
       -- changed versions: without outer join
       SELECT olds.product_surr_id AS product_surr_id,
              news.category_id     AS category_id,
              news.category_desc   AS category_desc
       FROM   (SELECT id    AS category_id,
                      title AS category_desc
               FROM   acc_categories) news,
              (SELECT product_surr_id,
                      category_id,
                      category_desc
               FROM   dim_products_scd
               WHERE  is_active = 'Y'
                      AND level_code = 'CATEGORY') olds
       WHERE  news.category_id = olds.category_id
     AND -- changed columns
     ( Decode (news.category_id, olds.category_id, 0, 1) = 1
     OR Decode (news.category_desc, olds.category_desc, 0, 1) = 1 )) src
ON ( dst.product_surr_id = src.product_surr_id )
WHEN matched THEN
  UPDATE SET valid_to = SYSDATE,
             is_active = 'N'
WHEN NOT matched THEN
  INSERT ( dst.product_surr_id,
           dst.product_id,
           dst.product_code,
           dst.product_desc,
           dst.title_id,
           dst.title_desc,
           dst.category_id,
           dst.category_desc,
           dst.level_code,
           dst.ta_source,
           dst.is_active,
           dst.valid_from,
           dst.valid_to,
           dst.insert_dt,
           dst.update_dt )
  VALUES ( seq_dim_products_scd.NEXTVAL,
           -99,
           'n.d.',
           'n.d.',
           -99,
           'n.d.',
           src.category_id,
           src.category_desc,
           'CATEGORY',
           'ACC',
           'Y',
           SYSDATE,
           To_date ('31.12.9999', 'dd.mm.yyyy'),
           SYSDATE,
           SYSDATE ); 

Процедура похожая, только теперь для определения новых записей ставим outer join с другой стороны, со стороны целевой таблицы. Ключ product_surr_id=-333 нужен будет позже для того, чтобы узнать, вставлять запись или обновлять.

Измененные записи добавляем в список командой UNION ALL и на этот раз outer join совсем не нужен.

В завершении, обновляем старые и добавляем новые записи командой MERGE. Исторические записи в целевой таблице определяем по основному ключу product_surr_id.

Группа и продукт.

Принцип такой же.

Оформляем это всё в форме пакета: pkg_etl_dimensions.sql

Тестирование SCD2

Процедура загрузки получилась сложная, поэтому обычно её автоматизируют, в таком случае и тестировать нужно один раз.

Наиболее интересным является тест на поиск дырок или пропущенных интервалов. Это может привести позже к незаметной потере фактов, если они находятся в промежутках времени, которых нет в дименшене:

-- Search for lost periods
SELECT
  SUM (CASE WHEN valid_to!=NVL(v_f_next,TO_DATE('31.12.9999','DD.MM.YYYY'))
       THEN 1 ELSE 0 END ) AS is_lost
FROM (
   SELECT PRODUCT_SURR_ID, PRODUCT_ID, valid_to, LEAD(valid_from)
     OVER (PARTITION BY PRODUCT_ID ORDER BY valid_from ASC) AS v_f_next
   FROM DIM_PRODUCTS_SCD
  WHERE PRODUCT_ID NOT IN(-99, -98) );

Все тесты: tests.sql

Вместо вывода

Приведённая выше, форма организации исторических данных оказалась очень удобной для анализа и построения отчётов. Структура является быстрой для чтения и медленной для загрузки.

Сама программа загрузки является слишком сложной. Нобходимо учитывать время отладки и тестирования.

Таблицы в базе Oracle и других имеет смысл сжимать. В случае постоянного обновления полей база данных их распаковывает. Необходимо периодически ещё и обслуживать такие SCD2, т.к. они имеют свойство расти.

Имеет смысл организовать структуру с приоритетом на увеличение скорости записи и уменьшение времени обслуживания.