Map-reduce, or its open source version Hadoop, is a parallel programming framework for running scripts, Java, C, and other external programming languages on hundreds of nodes. It is popular with Dot.Com companies who have large server farms and need to produce reports on website activity or produce search indexes. In general, Map-reduce applications overlap BI applications and data warehouses. However, Map-reduce applications can coexist with a data warehouse: one parallel processing, the other parallel database. Coexistence allows each subsystem’s best capabilities to be used to complement the other. With Teradata’s in-database processing technology, Map-reduce can become MPP ETL subsystem, or we can run Map-Reduce functions inside the EDW, or using table functions we can directly integrate with the Map-reduce nodes. This article illustrates a commonly used Map-reduce function running inside the Teradata EDW.

For Dot.Com companies, such as eBay, Amazon, web logs of visitor activity are a key source for user behavior analysis. Thus mouse clicks on web pages are gathered to produce enormous data sets for analysis. Among many analyses that can be done, identifying a visitor session is often the first task.

Sessionization is a typical map-reduce use case in web data analysis. Given a set of click stream logs, we need to divide these click streams into sessions, which are defined as to include all of a user’s clicks that occur within a specified range of time. With this means, we can detect valuable click patterns from each session, such click patterns are often used for fraud detection, ads promotion, revenue prediction, and other applications. For instance, we can sessionize two web clicks by a user that are within 30 minutes of each other as the same session, because the second click can be reasonably treated as a click through from the first click. However, if there is a new click 30 minutes after the previous one, we regard them as irrelevant and treat the new one as a brand new session.

Given a sample web click (web page visitor mouse activity) schema and data as follows:

Create Table WebClicks (IP varchar(16), Click_Timestamp timestamp, URL varchar(200))
IP Click_Timestamp URL
153.65.20.33 2008-02-03 14:33:15 ebay.com
153.65.20.33 2008-02-03 14:44:15 dvd.shop.ebay.com
153.65.20.33 2008-02-03 14:59:08 netflix.com
153.65.9.89 2008-02-04 11:02:19 amazon.com
153.65.9.89 2008-02-04 11:33:04 amazon.com/travel
153.65.9.89 2008-02-04 11:48:27 travelocity.com

Let’s say 30 minutes is the user specified period for identifying the same visitor session. After running the sessionization task, the output should be as follows:

Session_No IP Click_Timestamp URL
1 153.65.20.33  2008-02-03 14:33:15  ebay.com
1 153.65.20.33  2008-02-03 14:44:15  dvd.shop.ebay.com/
1 153.65.20.33  2008-02-03 14:59:08  netflix.com
1 153.65.9.89  2008-02-04 11:02:19  amazon.com
2 153.65.9.89  2008-02-04 11:33:04  amazon.com/travel
2 153.65.9.89  2008-02-04 11:48:27  travelocity.com

Basically, the 3 clicks from IP 153.65.20.33 get the same session number because each new click is within 30 minutes of the previous click. For IP 153.65.9.89, its second click (2008-02-04 11:33:04) get a new session number because it is more than 30 minutes after the first click (2008-02-04 11:02:19).

This document first shows two Teradata in-database processing approaches to implement such sessionization Map-Reduce task. We will further show a UDF example to implement more advanced analytical task on the sessionized data. 

2. Standard SQL OLAP Function

Teradata supports full ANSI SQL OLAP standard functions, and we can leverage the moving window ordered analytics to assign either the same or new session ids for incoming web clicks.

The complete SQL code script is as follows: 

with dt (IP, Click_Timestamp, samesession) as
( select IP, Click_Timestamp, 
case when (Click_Timestamp - (max(Click_Timestamp) over (partition by IP order by Click_Timestamp rows between 1 preceding and 1 preceding ) ) minute) < (interval '30' minute) then 0 else 1 end 
from webclicks)
select sum(samesession) over (partition by IP order by Click_Timestamp rows unbounded preceding) as Session_No, IP, Click_Timestamp
from dt

The logic of this SQL is:

• First, we create a derived table dt, which copies table webclicks data’s IP, Click_Timestamp, and defines a new Boolean column samesession. By running the case statement, we check whether the current click’s timestamp is within 30 minutes of the preceding click, and assign either 0 or 1 accordingly.

• Then in the main SQL statement, we use the cumulative sum function to assign Session_No to each tuple.

• Teradata’s parallel-aware optimizer generates the best query plan and makes sure each click from the same IP address will be mapped to the same AMP, and the sessionization reduce task will be running parallelly among all the AMPs. Also note that the derived table and the main query only require 1 scan on the input data, this greatly saves orders of magnitude of processing time if we write an alternative SQL statement with self-joins.

This standard ANSI SQL approach requires no external programming language support, and can achive the best performance because everything is solely in-database processing. However, it requires that the application programmer is a SQL expert in order to implement the whole logic within current suported SQL moving window functions. 

3. User-Defined Table Function with Ordered Input

In reality, a lot of application programmers have a bunch of pre-defined functions, written in external programming languages such as C/C++ or java, and would like to leverage these functions with Teradata for their analytical task, such as this sessionization use case.

With Teradata 13.0’s newest table UDF ordered input feature, we show how to link external sessionization functions to achieve high performing in-database processing.

Suppose we have an external C function with the name sessionization.c, which implements the sessionization logic as we described in Section 1. The detailed source code is shown as follows.

/*************************************/
/*
REPLACE FUNCTION Sessionize (  IP VARCHAR(16),  Click_TS TimeStamp,  Interval_Min INTEGER) 
RETURNS TABLE (Session_No INTEGER, Output_IP VARCHAR(16), Output_Click_TS TimeStamp)
LANGUAGE C
NO SQL
PARAMETER STYLE SQL
EXTERNAL NAME 'CS!Sessionization!D:\Projects\Udfs\test\Sessionization.c!F!Sessionize';

WITH clicks (Session_No, IP, Click_TS, URL) AS
(SELECT * FROM Sessionized_webclicks)

SELECT Tudf.entry_URL, Tudf.pattern, count(*)
FROM TABLE (5ClickPattern (clicks.Session_No, clicks.IP, clicks.Click_TS, clicks.URL)
HASH BY clicks.IP, clicks.Session_No LOCAL ORDER BY clicks.IP, clicks.Session_No, clicks.Click_TS) 
AS Tudf (entry_URL, pattern)
GROUP BY 1, 2 ORDER BY 3 DESC;
*/
/*************************************/

/*************************************/
/*Table UDF entry point: void Sessionization ( ) */
/*************************************/

#define SQL_TEXT Latin_Text
#include <sqltypes_td.h>
#include <string.h>
#include <time.h>
#include <stdio.h>

/*************************************/
/* The definition of the scratch pad */
/*************************************/


typedef struct {
	char Cur_IP [16];
	int Cur_SessionNo;		/*current session no*/
	char row_processed;		/*boolean variable, after new input row has been sessionized, set value to TRUE*/
	TimeStamp Lst_ClickTS;  /*last click timestamp in the current session*/
} local_ctx;


/********************************************************************/
/* This copy to SQL_TEXT fields will work to copy ASCII strings to */
/* SQL_TEXT strings (in this case error_message) for any character */
/* set mode. In other words if the SQL_TEXT is defined as */
/* Unicode_Text it will still work to give the proper error message */
/********************************************************************/

static void unicpy(SQL_TEXT *dest, char *src) {
	while (*src)
		*dest++ = *src++;
}

/********************************************************************/
/* A simple function to decide whether to accumualte session no by 1 */
/********************************************************************/

static int NewSession (TimeStamp *Old_ClickTs, TimeStamp *New_ClickTs, int *Interval_Min) {

	struct tm new_ts;
	struct tm old_ts;
	time_t newTime;
	time_t oldTime;

	new_ts.tm_sec = (New_ClickTs -> seconds) / 1000000;
	new_ts.tm_min = (New_ClickTs -> minute);
	new_ts.tm_hour = (New_ClickTs -> hour);
	new_ts.tm_mday = (New_ClickTs -> day);
	new_ts.tm_mon = (New_ClickTs -> month) - 1;
	new_ts.tm_year = (New_ClickTs -> year) - 1900;
	new_ts.tm_isdst = 0;

	newTime = mktime ( & new_ts );	


	old_ts.tm_sec = (Old_ClickTs -> seconds) / 1000000;
	old_ts.tm_min = (Old_ClickTs -> minute);
	old_ts.tm_hour = (Old_ClickTs -> hour);
	old_ts.tm_mday = (Old_ClickTs -> day);
	old_ts.tm_mon = (Old_ClickTs -> month) - 1;
	old_ts.tm_year = (Old_ClickTs -> year) - 1900;
	old_ts.tm_isdst = 0;

	oldTime = mktime ( & old_ts );
	
	if ( difftime(newTime, oldTime) > *Interval_Min * 60)
		return 1;
	else
		return 0;

}

/********************************************************************/
/* A simple function to adjust the Cur_SessionNo */
/********************************************************************/

static int Asgn_SessionNo (local_ctx *info, char * IP, TimeStamp * New_ClickTs, int *Interval_Min) {
	
	if (strcmp (info -> Cur_IP , IP) != 0) {
		
		strcpy (info -> Cur_IP , IP);
		info -> Cur_SessionNo  = 1;
	
	}
	
	else if ( NewSession (&(info -> Lst_ClickTS), New_ClickTs, Interval_Min) == 1) {
		info -> Cur_SessionNo ++;
		
	}
	
	info -> Lst_ClickTS = * New_ClickTs;

	info->row_processed = 1;

	return 0;
}

/***********************************/
/* Do a reset of the context block */
/***********************************/

static void Reset(local_ctx *info) {
	info->Cur_SessionNo = 0;
	info->row_processed = 0;
	strcpy (info->Cur_IP, "");
	info->Lst_ClickTS.year = 2000;
	info->Lst_ClickTS.month = 1;
	info->Lst_ClickTS.day = 1;
	info->Lst_ClickTS.hour = 0;
	info->Lst_ClickTS.minute = 0;
	info->Lst_ClickTS.seconds = 0;
}


void Sessionize (
	
	char *IP, /* IP, to be hashed by */
	TimeStamp *Click_TS, /* click timestamp, to be local order by */
	INTEGER *Interval_Min, /*user specified input for session time period*/

	INTEGER *Output_Session_No, /* output */
	char *Output_IP, /* output */
	TimeStamp *Output_Click_TS, /* output */

	int *IP_i, 
	int *Click_TS_i,
	int *Interval_Min_i,

	int *Output_Session_No_i, 
	int *Output_IP_i, 
	int *Output_Click_TS_i, 

	char sqlstate[6],
	SQL_TEXT fncname[129],
	SQL_TEXT sfncname[129],
	SQL_TEXT error_message[257] ) {
	
	local_ctx *state_info;
	FNC_Phase Phase;
	int status;

	/* make sure the function is called in the supported context */
	switch (FNC_GetPhaseEx(&Phase, TBL_LASTROW)) {
	/***********************************************************/
	/* Process the constant expression case. Only one AMP will */
	/* participate for this example */
	/***********************************************************/
		case TBL_MODE_CONST:
			break;
		
	/****************************************/
	/* Process the varying expression case. */
	/****************************************/
		case TBL_MODE_VARY:
			switch(Phase) {
				
				case TBL_PRE_INIT:
				/* get scratch memory to use from now on */
					state_info = FNC_TblAllocCtx(sizeof(local_ctx));
					Reset(state_info);

					break;

				case TBL_INIT:
				/* Preprocess the input click row */
					state_info = FNC_TblGetCtx();

					status = Asgn_SessionNo (state_info, IP, Click_TS, Interval_Min);

					break;

				case TBL_BUILD:
					state_info = FNC_TblGetCtx();
					
					/* Have no more data; return no data sqlstate. */
					if (state_info->row_processed == 0 ) {
						strcpy(sqlstate, "02000");					
						break;
					}

					/* output sessionized row*/
					if (*Output_IP_i == 0)
						strcpy (Output_IP, IP);
					if (* Output_Click_TS_i == 0)
						* Output_Click_TS = * Click_TS;
					if (* Output_Session_No_i == 0)
						* Output_Session_No = state_info -> Cur_SessionNo;

					state_info->row_processed = 0;
					
					break;


				case TBL_BUILD_EOF:

					state_info = FNC_TblGetCtx();
					
					if (state_info->row_processed == 0 ) {
						strcpy(sqlstate, "02000");					
						break;
					}

					break;

				case TBL_FINI:
					
					break;

				case TBL_END:
					break;
				
				case TBL_ABORT:
					break;
			}
	}
}

We will now show how to link this external C function into Teradata as a table UDF. Firstly, we need to definie the table function interface in Teradata, as follows. 

REPLACE FUNCTION Sessionize (  IP Varchar(16),  Click_TS TimeStamp,  Interval_Min INTEGER) 
RETURNS TABLE (Session_No INTEGER, Output_IP Varchar(16), Output_Click_TS TimeStamp)
LANGUAGE C
NO SQL
PARAMETER STYLE SQL
EXTERNAL NAME 'CS!Sessionization!D:\Projects\Udfs\test\Sessionization.c!F!Sessionize';

Basically, the function interface should be compatible with the external function Sessionize ( ), which takes in 3 parameters, and returns the session number for each clicks. 

After the table function has been successfully created in Teradata, we can easily call it with ordered input to achieve sessionization task, as follows: 

WITH clicks (IP, Click_TS) AS
(SELECT IP, Click_Timestamp FROM webclicks)
SELECT Session_No, Output_IP, Output_Click_TS 
FROM TABLE (Sessionize (clicks.IP, clicks.Click_TS, 30)
HASH BY clicks.IP LOCAL ORDER BY clicks.IP, clicks.Click_TS) tudf
ORDER BY Output_IP, Session_No

Here, the derived table clicks takes the IP and Click_Timestamp from webclicks table, and passes them to Sessionize table UDF together with 30 minutes as the user specified period parameter. The HASH BY and LOCAL ORDER BY clauses guarantees that the same IP will be mapped to the same AMP ordered by click timestamp, which then will invoke the sessionization UDF to get the correct session ID.

The SQL interface for this approach looks much easier than the SQL OLAP function in Section 2. Teradata’s UDF framework also makes sure that the UDF will be executed in parallel.  

4. Another Sessionization Example with Teradata UDF

In this section, we show that Teradata’s user defined table function can achieve a lot more advanced sessionization analytical tasks.

Suppose we are interested in the click patterns originated from the west coast from 12am to 1am every day, and want to answer the question: for traffic that stay only for 5 or less page views, what are the most frequent visited URL sequence? In traditional data warehouse solutions, this problem would require a five-way self-join of the original web log data, which is simply unfeasible for large sites such as eBay and Amazon. Alternatively, specialized SQL extensions have to be implemented in order to specify the complex patterns in the FROM clause, even if such extensions are available, the SQL statement will be hard to write, easy to generate errors, and inflexible for minor logic change.

With Teradata’s table UDF, we can use c/java external programming language to implement the pattern logic, and invoke them in the FROM clause. 

REPLACE FUNCTION 5ClickPattern ( Session_No Int,  IP Varchar(15),  Click_TS TimeStamp,  Interval_Min INTEGER) 
RETURNS TABLE (entry_URL Varchar(255), pattern Varchar(2000))
LANGUAGE C
…

WITH clicks (Session_No, IP, Click_TS, URL) AS
(SELECT * FROM Sessionized_webclicks)

SELECT Tudf.entry_URL, Tudf.pattern, count(*)
FROM TABLE (5ClickPattern (clicks.Session_No, clicks.IP, clicks.Click_TS, clicks.URL)
HASH BY clicks.IP, clicks.Session_No LOCAL ORDER BY clicks.IP, clicks.Session_No, clicks.Click_TS) 
AS Tudf (entry_URL, pattern)
GROUP BY 1, 2 ORDER BY 3 DESC; 

Here, 5ClickPattern is the table UDF to check 5-or-less page views during a certain session, which returns the referred query string and the internal visit pattern.It is easy to see that we don’t require any proprietary SQL extensions, alternatively, we leave the logic implementation details to the external programming language. 

5. Conclusion

In this document, we show that Teradata provides various features to support in-database map-reduce processing tasks such as sessionization. With the fully supported ANSI SQL OLAP moving window functions, an SQL expert can easily write a moving window function to support sessionization logic. As an alternative, we can leverage map-reduce functions written in external programming language, create a Teradata table UDF to link them, and invoke them in the FROM clause of SQL, with Teradata 13.0’s table UDF with ordered input feature. We can also use this same feature for other more complex and advanced map-reduce pattern detection applications.

 

Discussion
21 Dec 2011

Thanks for share! Very useful acticle!
Have you tested with this using Java?

regards,

Carlos G. Varela
Freelance - BI Project Manager / Solutions Architect
+34 616307704
carlos@varela.com

You must sign in to leave a comment.