को रोक नहीं है मुझे दो मैपरेडस नौकरियों की श्रृंखला की आवश्यकता है। मैंने जॉब कंट्रोल का इस्तेमाल जॉब 2 को जॉब 1 के आश्रित के रूप में सेट करने के लिए किया था। यह काम करता है, आउटपुट फाइलें बनाई जाती हैं !! लेकिन यह रुकता नहीं है! खोल में यह इस राज्य में रहता है:(हाडोप) मैपरेडस - चेन जॉब्स - जॉबकंट्रोल
12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1
मैं इसे कैसे रोक सकता है? यह मेरा मुख्य है।
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Configuration conf2 = new Configuration();
Job job1 = new Job(conf, "canzoni");
job1.setJarByClass(CanzoniOrdinate.class);
job1.setMapperClass(CanzoniMapper.class);
job1.setReducerClass(CanzoniReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));
Job job2 = new Job(conf2, "songsort");
job2.setJarByClass(CanzoniOrdinate.class);
job2.setMapperClass(CanzoniSorterMapper.class);
job2.setSortComparatorClass(ReverseOrder.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setReducerClass(CanzoniSorterReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
JobControl jobctrl = new JobControl("jobctrl");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
jobctrl.run();
////////////////
// NEW CODE ///
//////////////
// delete jobctrl.run();
Thread t = new Thread(jobctrl);
t.start();
String oldStatusJ1 = null;
String oldStatusJ2 = null;
while (!jobctrl.allFinished()) {
String status =cJob1.toString();
String status2 =cJob2.toString();
if (!status.equals(oldStatusJ1)) {
System.out.println(status);
oldStatusJ1 = status;
}
if (!status2.equals(oldStatusJ2)) {
System.out.println(status2);
oldStatusJ2 = status2;
}
}
System.exit(0);
} }
मैं एक थ्रेड का उपयोग कर JobControl शुरू करने के लिए इसे हल। मैंने जांच की कि कुछ समय चक्र का उपयोग करके नौकरियां की गई हैं: जबकि (! Jobctrl.allFinished()) और एक system.exit() चक्र से बाहर है। अब मैं उन नौकरियों को सूचना संदेश लौटा दूंगा, जो मैंने प्राप्त किया है, यह जानना है कि कौन सी नौकरी चल रही है, ControlledJob.toString() के साथ। मुझे नहीं पता कि सूचना संदेश कैसे प्राप्त करते हैं: मैपर कार्य की संख्या, कार्य को कम करने की संख्या, इनपुट में रिकॉर्ड या आउटपुट आदि ... इन संदेशों को प्राप्त करने का कोई विचार? –
"job.getCounters()। ToString()" पर्याप्त है? – zsxwing
क्या यह जॉबकंट्रोल कक्षा में एक बग है? – Rags