Introduction
Suppose it is necessary to process rows according to last_update_date using an external application. For example there might be two systems that needs to be synchronized which should be achieved through a middleware application. Requirement roughly can be described such that when last_update_date of a row changes, in other words a row is updated or a new row is added, it is necessary to take some actions, e.g. synchronize peer system.
Considering life cycle of external application, it is necessary to differentiate two phases: early phase and late phase.
Early phase is the phase when external application is first launched. Existing data must be processed. After all existing data is processed, early phase ends. Since all pre-existing data is processed, early phase is probably the phase which most processing throughput happens.
Late phase is the phase new updates are processed. Late phase spans the external application lifecycle excluding early phase.
Solution
Solution requires keeping last_update_date of the last row processed as current update date. From now on current update date is encoded as CUD. CUD is a date record such that all rows having last update date earlier than CUD are processed. All rows having last update date greater than or equal to CUD needs to be processed.
At the beginning of early phase, CUD is set as a very early date which is called safe epoch. Rows are retrieved from source system ordered according to last_update_date and having last update date greater than CUD.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE
As rows are processed CUD is updated to the value of last update date of last row processed. Keeping system breakdowns in mind, it is important to persist CUD.
Above scenario results in having all items from source table at once which is not memory efficient. It is good idea to give an upper bound to number of rows retrieved, which is called fetch size. At each execution at most fetch size number of rows are retrieved and processed, which is called a page. At subsequent execution, rows coming after last page should be processed. Query below uses MYSQL dialect.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE LIMIT 0, FETCH_SIZE
Problem with this approach is, large blocks having the same last update date, which is called a frame, might be larger than fetch size. In such a case, next frame is never reached. Suppose fetch size is 50 and current frame includes 150 items query above will always return first 50 items of the same frame. Solution is to use pagination for those large frames.
A page may contain many frames. Challenge is to determine whether next frame needs pagination. Logic is to page source rows in usual way as proposed above. If a large frame is detected, this frame should be retrieved using pagination. If last and first items in a page shares the same last update date, this page contains a large frame. Next page requires pagination. Additionally it is good practice to order by a key column to ensure an already retrieved row does not show in a subsequent page.
SELECT * FROM TABLE_1 WHERE LAST_UPDATE_DATE >= CUD ORDER BY LAST_UPDATE_DATE,PK_COLUMN LIMIT FETCH_SIZE*FRAME_PAGE, FETCH_SIZE
Frame page shows the page number within the frame. If pagination is not used frame page has value 0. Here is a pseudo code of the algorithm with corner cases.
List<T> unprocessedList = retrieve("SELECT * FROM SOURCE_TABLE WHERE LAST_UPDATE_DATE >= "+current_update_date+" order by LAST_UPDATE_DATE, PK_COLUMN LIMIT "+(framePage * FETCH_SIZE)+","+ FETCH_SIZE); if (unprocessedList.size() == FETCH_SIZE) { if (unprocessedList.get(0).getLastUpdateDate().equals(unprocessedList.get(FETCH_SIZE - 1).getLastUpdateDate())) {//frame paging necessary if (usingFramePaging) { if (!lastUpdateDate.equals(unprocessedList.get(0).getLastUpdateDate())) { // different frame : we need to continue frame paging but need to reset frame context framePage = 1;// 0th frame framePage is already fetched } else { // same frame : continue frame paging framePage++; } } else { // frame paging necessary but not yet active: start frame paging usingFramePaging = true; framePage = 1;// 0th frame framePage is already fetched } } else { // frame paging not necessary usingFramePaging = false; framePage = 0; } } else { // frame paging not necessary any more usingFramePaging = false; framePage = 0; }
This algorithm does not ensures interleaving is avoided. Namely some rows from earlier page might show up in subsequent page. If this is not tolerable additional control logic should be added to avoid this.
Working with last update date brings another undesired side effect. At late phase CUD shows current date. Suppose there are 5 rows updated in current day. Since rows having last update date greater than or equal to CUD, 5 items will be retrieved at each execution within the whole day. Those are called dally looping items. To avoid this situation additional control is needed. A solution can be caching items from last page. Cache should be large enough to include possible rolling items and as small as possible to reduce memory waste. If fetched rows have the same number with the rows in the cache, fetched rows are compared with the rows in the cache. If they are equal, fetched items are dropped. Comparison is necessary because same items can be updated multiple times during the day and each update should be captured. If number of fetched rows are different that the ones in the cache or they are not equal, cache content is replaced with fetched rows.
unprocessedList=// fetched rows; // if size is below EXPECTED_DAILY_UPDATE_COUNT, it is eligible for daily looping check if (unprocessedList.size() <= EXPECTED_DAILY_UPDATE_COUNT) { // if all items are all in the daily loop list if (possiblyDailyLoopingObjectsList.size()==unprocessedList.size() && possiblyDailyLoopingObjectsList.containsAll(unprocessedList)) { resultList = Collections.emptyList(); // original unprocessedList comes from Dao layer and is unmodifiable (cannot call clear()). } else { resultList = unprocessedList; possiblyDailyLoopingObjectsList.clear(); for (T t : unprocessedList) possiblyDailyLoopingObjectsList.add(t); } }
Summary
Algorithm described is welcomed to be used as long as it refers to author.
Feel free to share your comments to http://el-harezmi.blogspot.com on design and any typos, incorrect or inaccurate expressions you see.
Comments
Post a Comment