2013-02-01 28 views
7

के साथ हडोप में सफल रिकॉर्ड के बीच मतभेदों की गणना करें मेरे पास एक हाइव टेबल है जिसमें ग्राहक कॉल का डेटा है। सादगी के लिए मान लें कि इसमें 2 कॉलम हैं, पहले कॉलम में ग्राहक आईडी है और दूसरा कॉलम कॉल का टाइमस्टैम्प रखता है (यूनिक्स टाइमस्टैम्प)। प्रत्येक ग्राहक के लिएहाइड क्वॉरीज़

Customer1 timestamp11 
Customer1 timestamp12 
Customer1 timestamp13 
Customer2 timestamp21 
Customer3 timestamp31 
Customer3 timestamp32 
... 

क्या यह संभव है एक हाइव क्वेरी कि रिटर्न बनाने के लिए,:

SELECT * FROM mytable SORT BY customer_id, call_time; 

परिणाम है:

मैं प्रत्येक ग्राहक के लिए सभी कॉल्स खोजने के लिए इस तालिका क्वेरी कर सकता है , दूसरी कॉल से शुरू, दो सफल कॉल के बीच समय अंतराल? ऊपर उदाहरण के लिए कि क्वेरी लौटना चाहिए:

Customer1 timestamp12-timestamp11 
Customer1 timestamp13-timestamp12 
Customer3 timestamp32-timestamp31 
... 

मैं sql solution से समाधान अनुकूल करने के लिए कोशिश की है, लेकिन मैं हाइव सीमाओं के साथ अटक कर रहा हूँ: it accepts subqueries only in FROM और joins must contain only equalities

धन्यवाद।

EDIT1:

मैं एक हाइव यूडीएफ फ़ंक्शन का उपयोग करने की कोशिश की:

public class DeltaComputerUDF extends UDF { 
private String previousCustomerId; 
private long previousCallTime; 

public String evaluate(String customerId, LongWritable callTime) { 
    long callTimeValue = callTime.get(); 
    String timeDifference = null; 

    if (customerId.equals(previousCustomerId)) { 
     timeDifference = new Long(callTimeValue - previousCallTime).toString(); 
    } 

    previousCustomerId = customerId; 
    previousCallTime = callTimeValue; 

    return timeDifference; 
}} 

और इसका इस्तेमाल नाम "डेल्टा" के साथ।

लेकिन ऐसा लगता है (लॉग और परिणाम से) कि इसका उपयोग एमएपी समय पर किया जा रहा है। 2 समस्याओं इस से उत्पन्न होती हैं:

पहले: तालिका डेटा इस फ़ंक्शन का उपयोग करने से पहले ग्राहक आईडी और टाइमस्टैम्प के अनुसार क्रमबद्ध किया जाना चाहिए। क्वेरी:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

काम नहीं करता है क्योंकि सॉर्टिंग भाग REDUCE समय पर किया जाता है, मेरे फ़ंक्शन का उपयोग होने के कुछ समय बाद।

मैं फ़ंक्शन का उपयोग करने से पहले तालिका डेटा को सॉर्ट कर सकता हूं, लेकिन मैं इससे खुश नहीं हूं क्योंकि यह एक ओवरहेड है जिसे मैं टालना चाहता हूं।

दूसरा: एक वितरित Hadoop विन्यास के मामले में, डेटा उपलब्ध नौकरी ट्रैकर के बीच विभाजित है। इसलिए मेरा मानना ​​है कि इस फ़ंक्शन के कई उदाहरण होंगे, प्रत्येक मैपर के लिए, इसलिए 2 मैपर के बीच एक ही ग्राहक डेटा विभाजित होना संभव है। इस मामले में मैं ग्राहक कॉल खो दूंगा, जो स्वीकार्य नहीं है।

मुझे नहीं पता कि इस समस्या को कैसे हल किया जाए। मुझे पता है कि वितरण द्वारा यह सुनिश्चित किया जाता है कि एक विशिष्ट मूल्य वाले सभी डेटा एक ही reducer को भेजा जाता है (इस प्रकार यह सुनिश्चित करना कि एसओआरटी अपेक्षित काम करता है), क्या किसी को पता है कि मैपर के लिए कुछ ऐसा है या नहीं?

अगला मैं कम स्क्रिप्ट का उपयोग करने के लिए libjack के सुझाव का पालन करने की योजना बना रहा हूं। कुछ अन्य हाइव प्रश्नों के बीच यह "गणना" की आवश्यकता है, इसलिए मैं बालास्वामी वाडेमन द्वारा सुझाए गए अनुसार, किसी अन्य टूल पर जाने से पहले, सब कुछ हाइव ऑफ़र का प्रयास करना चाहता हूं।

EDIT2:

मैं कस्टम स्क्रिप्ट समाधान की जांच के लिए शुरू कर दिया।

स्ट्रीमिंग आमतौर पर एक समान UDFs या InputFormat वस्तुओं कोडिंग की तुलना में कम कुशल है: लेकिन, प्रोग्रामिंग हाइव पुस्तक में 14 अध्याय (इस अध्याय कस्टम स्क्रिप्ट प्रस्तुत करता है) के प्रथम पृष्ठ में, मैं निम्नलिखित पैरा पाया। पाइप से बाहर में इसे पारित करने के लिए डेटा को क्रमबद्ध और deserializing अपेक्षाकृत अक्षम है। एक एकीकृत तरीके से पूरे प्रोग्राम को डीबग करना भी कठिन होता है। हालांकि, यह तेजी से प्रोटोटाइप और जावा में लिखे गए मौजूदा कोड का लाभ उठाने के लिए उपयोगी नहीं है। हाइव उपयोगकर्ताओं के लिए जो जावा कोड लिखना नहीं चाहते हैं, यह बहुत प्रभावी दृष्टिकोण हो सकता है।

तो यह स्पष्ट था कि कस्टम स्क्रिप्ट दक्षता के मामले में सबसे अच्छा समाधान नहीं है।

लेकिन मुझे अपना यूडीएफ फ़ंक्शन कैसे रखना चाहिए, लेकिन यह सुनिश्चित करें कि यह एक वितरित हडोप कॉन्फ़िगरेशन में अपेक्षित काम करता है? मुझे भाषा मैनुअल यूडीएफ विकी पेज के यूडीएफ आंतरिक अनुभाग में इस प्रश्न का उत्तर मिला। अगर मैं अपने प्रश्न लिखें:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

यह समय को कम करने और निर्माणों के द्वारा और एक तरह वितरित की गारंटी है कि एक ही ग्राहक से सभी रिकॉर्ड एक ही कम करने के द्वारा संसाधित किया जा रहा है, कॉल के क्रम में कम से क्रियान्वित किया जाता है।

तो उपरोक्त यूडीएफ और यह क्वेरी मेरी समस्या का समाधान करती है।

+0

मुझे लगता है कि यह [इस सवाल] के समान है (http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows) मैंने एक कस्टम मानचित्र/हाइव में कम करने के साथ उत्तर दिया। आपको बस उचित कमी स्क्रिप्ट प्रदान करनी होगी। – libjack

+0

मुझे नहीं पता कि यह हाइव में कैसे करें लेकिन ऐसा करने के लिए कैस्केडिंग एपीआई है। कैस्केडिंग में बफर कहा जाता है। Http: //docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –

उत्तर

11

यह एक पुराने सवाल है, लेकिन भविष्य में संदर्भ के लिए, मैं यहाँ एक और प्रस्ताव लिखें:

हाइव Windowing functions अपने प्रश्न में पिछली/अगली मानों का उपयोग करने की अनुमति देता है।

एक simili कोड क्वेरी हो सकता है:

चयन CUSTOMER_ID, लैग (call_time, 1, 0) (CUSTOMER_ID आदेश द्वारा PARTITION call_time पंक्तियों 1 के पिछले द्वारा) से अधिक - MyTable से call_time;

0

हो सकता है कि किसी को एक समान आवश्यकता सामना करता है, समाधान मैंने पाया निम्नलिखित है (लिंक जोड़ने नहीं है, लेकिन मैं क्योंकि मैं काफी प्रतिष्ठा अंक नहीं हैं यह करने के लिए अनुमति नहीं कर रहा हूँ के लिए खेद है):

package com.example; 
// imports (they depend on the hive version) 
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) " 
    + "- computes the time passed between two succesive records from the same customer. " 
    + "It generates 3 columns: first contains the customer id, second contains call time " 
    + "and third contains the time passed from the previous call. This function returns only " 
    + "the records that have a previous call from the same customer (requirements are not applicable " 
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" 
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " 
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") 
public class DeltaComputerUDTF extends GenericUDTF { 
private static final int NUM_COLS = 3; 

private Text[] retCols; // array of returned column values 
private ObjectInspector[] inputOIs; // input ObjectInspectors 
private String prevCustomerId; 
private Long prevCallTime; 

@Override 
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { 
    if (ois.length != 2) { 
     throw new UDFArgumentException(
       "There must be 2 arguments: customer Id column name and call time column name"); 
    } 

    inputOIs = ois; 

    // construct the output column data holders 
    retCols = new Text[NUM_COLS]; 
    for (int i = 0; i < NUM_COLS; ++i) { 
     retCols[i] = new Text(); 
    } 

    // construct output object inspector 
    List<String> fieldNames = new ArrayList<String>(NUM_COLS); 
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); 
    for (int i = 0; i < NUM_COLS; ++i) { 
     // column name can be anything since it will be named by UDTF as clause 
     fieldNames.add("c" + i); 
     // all returned type will be Text 
     fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); 
} 

@Override 
public void process(Object[] args) throws HiveException { 
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); 
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); 

    if (customerId.equals(prevCustomerId)) { 
     retCols[0].set(customerId); 
     retCols[1].set(callTime.toString()); 
     retCols[2].set(new Long(callTime - prevCallTime).toString()); 
     forward(retCols); 
    } 

    // Store the current customer data, for the next line 
    prevCustomerId = customerId; 
    prevCallTime = callTime; 
} 

@Override 
public void close() throws HiveException { 
    // TODO Auto-generated method stub 

} 

} 

2) एक जार इस समारोह युक्त बनाएँ:

1) एक कस्टम समारोह बनाएँ। मान लीजिए कि jarname myjar.jar है।

3) जार को हाइव के साथ मशीन पर कॉपी करें। मान लीजिए यह/tmp

4) में रखा गया है हाइव के अंदर कस्टम समारोह को परिभाषित करें:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
    (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

टिप्पणी::

ADD JAR /tmp/myjar.jar; 
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5) क्वेरी निष्पादित ए। मैंने माना कि कॉल_टाइम कॉलम डेटा को बिगिन के रूप में संग्रहीत करता है। यदि यह स्ट्रिंग है, तो प्रक्रिया फ़ंक्शन में हम इसे स्ट्रिंग के रूप में पुनर्प्राप्त करते हैं (जैसा कि हम ग्राहक आईडी के साथ करते हैं), फिर इसे लंबे

बी पर पार्स करें। मैंने यूडीएफ के बजाय यूडीटीएफ का उपयोग करने का फैसला किया क्योंकि इस तरह से यह आवश्यक सभी डेटा उत्पन्न करता है। अन्यथा (यूडीएफ के साथ) जेनरेट किए गए डेटा को नल मानों को छोड़ने के लिए फ़िल्टर किया जाना चाहिए।तो, यूडीएफ समारोह (DeltaComputerUDF) मूल पोस्ट के पहले संपादित में वर्णित के साथ, क्वेरी होगा:

SELECT customer_id, call_time, time_difference 
FROM 
    (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
     (
     SELECT customer_id, call_time FROM mytable 
     DISTRIBUTE BY customer_id 
     SORT BY customer_id, call_time 
     ) t 
    ) u 
WHERE time_difference IS NOT NULL; 

सी। दोनों फ़ंक्शंस (यूडीएफ और यूडीटीएफ) वांछित के रूप में काम करते हैं, भले ही तालिका के अंदर पंक्तियों का क्रम कोई फर्क नहीं पड़ता (इसलिए डेल्टा फ़ंक्शंस का उपयोग करने से पहले ग्राहक आईडी द्वारा सॉर्ट किए जाने वाले टेबल डेटा और कॉल समय को कोई आवश्यकता नहीं है)

1

आप इसका उपयोग कर सकते हैं जावा या पायथन जैसी अन्य प्रोग्रामिंग भाषा के साथ स्पष्ट MAP-REDUCE। मानचित्र से बाहर निकलें {cutomer_id,call_time} और रेड्यूसर में आपको {customer_id,list{time_stamp}} मिलेगा और रेड्यूसर में आप इन टाइम टिकटों को सॉर्ट कर सकते हैं और डेटा को संसाधित कर सकते हैं।