Použil jsem příklad z webu a dokázal jsem přenést soubory z Mongo do HDFS a naopak. Nemohl jsem si teď zjistit přesnou webovou stránku. Ale program vypadá níže.
Můžete z toho dostat jiskru a jít dál.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.bson.BSONObject;
import org.bson.types.ObjectId;
import com.mongodb.hadoop.MongoInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;
public class CopyFromMongodbToHDFS {
public static class ImportWeblogsFromMongo extends
Mapper<LongWritable, Text, Text, Text> {
public void map(Object key, BSONObject value, Context context)
throws IOException, InterruptedException {
System.out.println("Key: " + key);
System.out.println("Value: " + value);
String md5 = value.get("md5").toString();
String url = value.get("url").toString();
String date = value.get("date").toString();
String time = value.get("time").toString();
String ip = value.get("ip").toString();
String output = "\t" + url + "\t" + date + "\t" + time + "\t" + ip;
context.write(new Text(md5), new Text(output));
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
MongoConfigUtil.setInputURI(conf,
"mongodb://127.0.0.1:27017/test.mylogs");
System.out.println("Configuration: " + conf);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Mongo Import");
Path out = new Path("/user/cloudera/test1/logs.txt");
FileOutputFormat.setOutputPath(job, out);
job.setJarByClass(CopyFromMongodbToHDFS.class);
job.setMapperClass(ImportWeblogsFromMongo.class);
job.setOutputKeyClass(ObjectId.class);
job.setOutputValueClass(BSONObject.class);
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}