Skip to main content

Processing of last_update_date tracked rows

Introduction

Suppose it is necessary to process rows according to last_update_date using an external application. For example there might be two systems that needs to be synchronized which should be achieved through a middleware application. Requirement roughly can be described such that when last_update_date of a row changes, in other words a row is updated or a new row is added, it is necessary to take some actions, e.g. synchronize peer system.

Considering life cycle of external application, it is necessary to differentiate two phases: early phase and late phase.
Early phase is the phase when external application is first launched. Existing data must be processed. After all existing data is processed, early phase ends. Since all pre-existing data is processed, early phase is probably the phase which most processing throughput happens.
Late phase is the phase new updates are processed.  Late phase spans the external application lifecycle excluding early phase.

 

Solution


Solution requires keeping last_update_date of the last row processed as current update date. From now on current update date is encoded as CUD. CUD is a date record such that all rows having last update date earlier than CUD are processed. All rows having last update date greater than or equal to CUD needs to be processed.
At the beginning of early phase, CUD is set as a very early date which is called safe epoch. Rows are retrieved from source system ordered according to last_update_date and having last update date greater than CUD.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE


As rows are processed CUD is updated to the value of last update date of last row processed. Keeping system breakdowns in mind, it is important to persist CUD. 

Above scenario results in having all items from source table at once which is not memory efficient. It is good idea to give an upper bound to number of rows retrieved, which is called fetch size. At each execution at most fetch size number of rows are retrieved and processed, which is called a page. At subsequent execution, rows coming after last page should be processed. Query below uses MYSQL dialect.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE LIMIT 0, FETCH_SIZE


Problem with this approach is, large blocks having the same last update date, which is called a frame, might be larger than fetch size. In such a case, next frame is never reached. Suppose fetch size is 50 and current frame includes 150 items query above will always return first 50 items of the same frame. Solution is to use pagination for those large frames.

A page may contain many frames. Challenge is to determine whether next frame needs pagination.  Logic is to page source rows in usual way as proposed above. If a large frame is detected, this frame should be retrieved using pagination. If last and first items in a page shares the same last update date, this page contains a large frame. Next page requires pagination. Additionally it is good practice to order by a key column to ensure an already retrieved row does not show in a subsequent page.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE,PK_COLUMN LIMIT FETCH_SIZE*FRAME_PAGE, FETCH_SIZE


Frame page shows the page number within the frame. If pagination is not used frame page has value 0. Here is a pseudo code of the algorithm with corner cases.
List<T> unprocessedList =
    retrieve("SELECT * FROM SOURCE_TABLE WHERE LAST_UPDATE_DATE >= "+current_update_date+" order by LAST_UPDATE_DATE, PK_COLUMN LIMIT "+(framePage * FETCH_SIZE)+","+ FETCH_SIZE);


if (unprocessedList.size() == FETCH_SIZE) {

    if (unprocessedList.get(0).getLastUpdateDate().equals(unprocessedList.get(FETCH_SIZE - 1).getLastUpdateDate())) {//frame paging necessary
        if (usingFramePaging) {
            if (!lastUpdateDate.equals(unprocessedList.get(0).getLastUpdateDate())) { // different frame : we need to continue frame paging but need to reset frame context
                framePage = 1;// 0th frame framePage is already fetched
            } else { // same frame : continue frame paging
                framePage++;
            }
        } else { // frame paging necessary but not yet active: start frame paging
            usingFramePaging = true;
            framePage = 1;// 0th frame framePage is already fetched
        }
    } else { // frame paging not necessary
        usingFramePaging = false;
        framePage = 0;
    }
} else { // frame paging not necessary any more
    usingFramePaging = false;
    framePage = 0;
}




This algorithm does not ensures interleaving is avoided. Namely some rows from earlier page might show up in subsequent page. If this is not tolerable additional control logic should be added to avoid this.
Working with last update date brings another undesired side effect. At late phase CUD shows current date. Suppose there are 5 rows updated in current day. Since rows having last update date greater than or equal to CUD, 5 items will be retrieved at each execution within the whole day. Those are called dally looping items. To avoid this situation additional control is needed. A solution can be caching items from last page. Cache should be large enough to include possible rolling items and as small as possible to reduce memory waste. If fetched rows have the same number with the rows in the cache, fetched rows are compared with the rows in the cache. If they are equal, fetched items are dropped. Comparison is necessary because same items can be updated multiple times during the day and each update should be captured. If number of fetched rows are different that the ones in the cache or they are not equal, cache content is replaced with fetched rows.



unprocessedList=// fetched rows;

// if size is below EXPECTED_DAILY_UPDATE_COUNT, it is eligible for daily looping check
if (unprocessedList.size() <= EXPECTED_DAILY_UPDATE_COUNT) {
    // if all items are all in the daily loop list
    if (possiblyDailyLoopingObjectsList.size()==unprocessedList.size() &&
        possiblyDailyLoopingObjectsList.containsAll(unprocessedList)) {
        resultList = Collections.emptyList(); // original unprocessedList comes from Dao layer and is unmodifiable (cannot call clear()).

    } else {
        resultList = unprocessedList;
        possiblyDailyLoopingObjectsList.clear();
        for (T t : unprocessedList) possiblyDailyLoopingObjectsList.add(t);
    }
}



Summary


Algorithm described is welcomed to be used as long as it refers to author.
Feel free to share your comments to http://el-harezmi.blogspot.com on design and any typos, incorrect or inaccurate expressions you see.

Comments

Popular posts from this blog

Obfuscating Spring Boot Projects Using Maven Proguard Plugin

Introduction Obfuscation is the act of reorganizing bytecode such that it becomes hard to decompile. Many developers rely on obfuscation to save their sensitive code from undesired eyes. Publishing jars without obfuscation may hinder competitiveness because rivals may take advantage of easily decompilable nature of java binaries. Objective Spring Boot applications make use of public interfaces, annotations which makes applications harder to obfuscate. Additionally, maven Spring Boot plugin creates a fat jar which contains all dependent jars. It is not viable to obfuscate the whole fat jar. Thus obfuscating Spring Boot applications is different than obfuscating regular java applications and requires a suitable strategy. Audience Those who use Spring Boot and Maven and wish to obfuscate their application using Proguard are the target audience for this article. Sample Application As the sample application, I will use elastic search synch application from my G...

Hadoop Installation Document - Standalone Mode

This document shows my experience on following apache document titled “Hadoop:Setting up a Single Node Cluster”[1] which is for Hadoop version 3.0.0-Alpha2 [2]. A. Prepare the guest environment Install VirtualBox. Create a virtual 64 bit Linux machine. Name it “ubuntul_hadoop_master”. Give it 500MB memory. Create a VMDK disc which is dynamically allocated up to 30GB. In network settings in first tab you should see Adapter 1 enabled and attached to “NAT”. In second table enable adapter 2 and attach to “Host Only Adaptor”. First adapter is required for internet connection. Second one is required for letting outside connect to a guest service. In storage settings, attach a Linux iso file to IDE channel. Use any distribution you like. Because of small installation size, I choose minimal Ubuntu iso [1]. In package selection menu, I only left standard packages selected.  Login to system.  Setup JDK. $ sudo apt-get install openjdk-8-jdk Install ssh and pdsh, if...

Java: Cost of Volatile Variables

Introduction Use of volatile variables is common among Java developers as a way of implicit synchronization. JIT compilers may reorder program execution to increase performance. Java memory model[1] constraints reordering of volatile variables. Thus volatile variable access should has a cost which is different than a non-volatile variable access. This article will not discuss technical details on use of volatile variables. Performance impact of volatile variables is explored by using a test application. Objective Exploring volatile variable costs and comparing with alternative approaches. Audience This article is written for developers who seek to have a view about cost of volatile variables. Test Configuration Test application runs read and write actions on java variables. A non volatile primitive integer, a volatile primitive integer and an AtomicInteger is tested. Non-volatile primitive integer access is controlled with ReentrantLock and ReentrantReadWriteLock  to c...