eXpress “1.5”

src/main.cpp

00001 
00009 #include <boost/unordered_map.hpp>
00010 #include <boost/unordered_set.hpp>
00011 #include <boost/filesystem.hpp>
00012 #include <boost/program_options.hpp>
00013 #include <boost/thread.hpp>
00014 #include <iostream>
00015 #include <iomanip>
00016 #include <fstream>
00017 #include <stdio.h>
00018 #include <stdlib.h>
00019 #include <time.h>
00020 #include "main.h"
00021 #include "bundles.h"
00022 #include "targets.h"
00023 #include "lengthdistribution.h"
00024 #include "fragments.h"
00025 #include "biascorrection.h"
00026 #include "mismatchmodel.h"
00027 #include "mapparser.h"
00028 #include "threadsafety.h"
00029 #include "robertsfilter.h"
00030 #include "directiondetector.h"
00031 #include "library.h"
00032 
00033 #ifdef PROTO
00034   #include PROTO_ALIGNMENT_INCL
00035   #include PROTO_TARGET_INCL
00036   #include <boost/archive/iterators/base64_from_binary.hpp>
00037   #include <boost/archive/iterators/transform_width.hpp>
00038   #include <boost/archive/iterators/ostream_iterator.hpp>
00039 #endif
00040 
00041 #ifndef WIN32
00042   #include "update_check.h"
00043 #endif
00044 
00045 using namespace std;
00046 namespace po = boost::program_options;
00047 namespace fs = boost::filesystem;
00048 
00049 Logger logger;
00050 
00051 // the forgetting factor parameter controls the growth of the fragment mass
00052 double ff_param = 0.85;
00053 
00054 // the burn-in parameter determines how many reads are required before the
00055 // error and bias models are applied to probabilistic assignment
00056 size_t burn_in = 100000;
00057 size_t burn_out = 5000000;
00058 bool burned_out = false;
00059 
00060 size_t max_read_len = 250;
00061 
00062 size_t max_indel_size = 10;
00063 
00064 size_t stop_at = 0;
00065 
00066 // file location parameters
00067 string output_dir = ".";
00068 string fasta_file_name = "";
00069 string in_map_file_names = "";
00070 string param_file_name = "";
00071 string haplotype_file_name = "";
00072 
00073 // intial pseudo-count parameters (non-logged)
00074 double expr_alpha = .005;
00075 double fld_alpha = 1;
00076 double bias_alpha = 1;
00077 double mm_alpha = 1;
00078 
00079 size_t bias_model_order = 3;
00080 
00081 // fragment length parameters
00082 size_t def_fl_max = 800;
00083 size_t def_fl_mean = 200;
00084 size_t def_fl_stddev = 80;
00085 size_t def_fl_kernel_n = 4;
00086 double def_fl_kernel_p = 0.5;
00087 
00088 // option parameters
00089 bool edit_detect = false;
00090 bool error_model = true;
00091 bool bias_correct = true;
00092 bool calc_covar = false;
00093 bool output_align_prob = false;
00094 bool output_align_samp = false;
00095 bool output_running_rounds = false;
00096 bool output_running_reads = false;
00097 size_t num_threads = 2;
00098 size_t num_neighbors = 0;
00099 size_t library_size = 0;
00100 
00101 // directional parameters
00102 Direction direction = BOTH;
00103 
00104 bool running = true;
00105 
00106 // used for multiple rounds of EM
00107 bool first_round = true;
00108 bool last_round = true;
00109 bool batch_mode = false;
00110 bool online_additional = false;
00111 bool both = false;
00112 size_t remaining_rounds = 0;
00113 
00114 bool spark_pre = false;
00115 
00116 typedef boost::unordered_map<string, double> AlphaMap;
00117 AlphaMap* expr_alpha_map = NULL;
00118 
00124 AlphaMap* parse_priors(string in_file) {
00125   ifstream ifs(in_file.c_str());
00126   if (!ifs.is_open()) {
00127     logger.severe("Unable to open input priors file '%s'.", in_file.c_str());
00128   }
00129   AlphaMap* alphas = new AlphaMap();
00130 
00131   string line;
00132 
00133   while(ifs.good()) {
00134     getline(ifs,line);
00135 
00136     size_t idx = line.find_first_of("\t ");
00137     if (idx!=string::npos) {
00138       string name = line.substr(0,idx);
00139       string val = line.substr(idx+1);
00140       (*alphas)[name] = atof(val.c_str());
00141     }
00142   }
00143   return alphas;
00144 };
00145 
00152 bool parse_options(int ac, char ** av) {
00153 
00154   size_t additional_online = 0;
00155   size_t additional_batch = 0;
00156   
00157   po::options_description standard("Standard Options");
00158   standard.add_options()
00159   ("help,h", "produce help message")
00160   ("output-dir,o", po::value<string>(&output_dir)->default_value(output_dir),
00161    "write all output files to this directory")
00162 #ifdef PROTO
00163   ("preprocess,D", "run preprocess script for eXpressD")
00164 #endif
00165   ("frag-len-mean,m", po::value<size_t>(&def_fl_mean)->default_value(def_fl_mean),
00166    "prior estimate for average fragment length")
00167   ("frag-len-stddev,s",
00168    po::value<size_t>(&def_fl_stddev)->default_value(def_fl_stddev),
00169    "prior estimate for fragment length std deviation")
00170   ("haplotype-file,H",
00171    po::value<string>(&haplotype_file_name)->default_value(haplotype_file_name),
00172    "path to a file containing haplotype pairs")
00173   ("additional-batch,B",
00174    po::value<size_t>(&additional_batch)->default_value(additional_batch),
00175    "number of additional batch EM rounds after initial online round")
00176   ("additional-online,O",
00177    po::value<size_t>(&additional_online)->default_value(additional_online),
00178    "number of additional online EM rounds after initial online round")
00179   ("max-read-len,L",
00180    po::value<size_t>(&max_read_len)->default_value(max_read_len),
00181    "maximum allowed length of a read")
00182   ("output-align-prob",
00183    "output alignments (sam/bam) with probabilistic assignments")
00184   ("output-align-samp",
00185    "output alignments (sam/bam) with sampled assignments")
00186   ("fr-stranded",
00187    "accept only forward->reverse alignments (second-stranded protocols)")
00188   ("rf-stranded",
00189    "accept only reverse->forward alignments (first-stranded protocols)")
00190   ("f-stranded",
00191    "accept only forward single-end alignments (second-stranded protocols)")
00192   ("r-stranded",
00193    "accept only reverse single-end alignments (first-stranded protocols)")
00194   ("no-update-check", "disables automatic check for update via web")
00195   ("logtostderr", "prints all logging messages to stderr")
00196   ;
00197   
00198   po::options_description advanced("Advanced Options");
00199   advanced.add_options()
00200   ("forget-param,f", po::value<double>(&ff_param)->default_value(ff_param),
00201    "sets the 'forgetting factor' parameter (0.5 < c <= 1)")
00202   ("library-size", po::value<size_t>(&library_size),
00203    "specifies library size for FPKM instead of calculating from alignments")
00204   ("max-indel-size",
00205    po::value<size_t>(&max_indel_size)->default_value(max_indel_size),
00206    "sets the maximum allowed indel size, affecting geometric indel prior")
00207   ("calc-covar", "calculate and output covariance matrix")
00208   ("expr-alpha", po::value<double>(&expr_alpha)->default_value(expr_alpha),
00209    "sets the strength of the prior, per bp")
00210   ("stop-at", po::value<size_t>(&stop_at)->default_value(stop_at),
00211    "sets the number of fragments to process, disabled with 0")
00212   ("burn-out", po::value<size_t>(&burn_out)->default_value(burn_out),
00213    "sets number of fragments after which to stop updating auxiliary parameters")
00214   ("no-bias-correct", "disables bias correction")
00215   ("no-error-model", "disables error modelling")
00216   ("aux-param-file",
00217    po::value<string>(&param_file_name)->default_value(param_file_name),
00218    "path to file containing auxiliary parameters to use instead of learning")
00219   ;
00220 
00221   string prior_file = "";
00222 
00223   po::options_description hidden("Experimental/Debug Options");
00224   hidden.add_options()
00225   ("num-threads,p", po::value<size_t>(&num_threads)->default_value(num_threads),
00226    "number of threads (>= 2)")
00227   ("edit-detect","")
00228   ("single-round", "")
00229   ("output-running-rounds", "")
00230   ("output-running-reads", "")
00231   ("batch-mode","")
00232   ("both","")
00233   ("prior-params", po::value<string>(&prior_file)->default_value(""), "")
00234   ("sam-file", po::value<string>(&in_map_file_names)->default_value(""), "")
00235   ("fasta-file", po::value<string>(&fasta_file_name)->default_value(""), "")
00236   ("num-neighbors", po::value<size_t>(&num_neighbors)->default_value(0), "")
00237   ("bias-model-order",
00238    po::value<size_t>(&bias_model_order)->default_value(bias_model_order),
00239    "sets the order of the Markov chain used to model sequence bias")
00240   ;
00241 
00242   po::positional_options_description positional;
00243   positional.add("fasta-file",1).add("sam-file",1);
00244 
00245   po::options_description cmdline_options;
00246   cmdline_options.add(standard).add(advanced).add(hidden);
00247 
00248   bool error = false;
00249   po::variables_map vm;
00250   try {
00251     po::store(po::command_line_parser(ac, av).options(cmdline_options)
00252               .positional(positional).run(), vm);
00253   } catch (po::error& e) {
00254     logger.info("Command-Line Argument Error: %s.", e.what());
00255     error = true;
00256   }
00257   po::notify(vm);
00258 
00259   if (ff_param > 1.0 || ff_param < 0.5) {
00260     logger.info("Command-Line Argument Error: forget-param/f option must be "
00261                 "between 0.5 and 1.0.");
00262     error= true;
00263   }
00264 
00265   if (fasta_file_name == "") {
00266     logger.info("Command-Line Argument Error: target sequence fasta file "
00267                 "required.");
00268     error = true;
00269   }
00270 
00271   if (error || vm.count("help")) {
00272     cerr << "express v" << PACKAGE_VERSION << endl
00273          << "-----------------------------\n"
00274          << "File Usage:  express [options] <target_seqs.fa> <hits.(sam/bam)>\n"
00275          << "Piped Usage: bowtie [options] -S <index> <reads.fq> | express "
00276          << "[options] <target_seqs.fa>\n\n"
00277          << "Required arguments:\n"
00278          << " <target_seqs.fa>     target sequence file in fasta format\n"
00279          << " <hits.(sam/bam)>     read alignment file in SAM or BAM format\n\n"
00280          << standard
00281          << advanced;
00282     return 1;
00283   }
00284 
00285   if (param_file_name.size()) {
00286     burn_in = 0;
00287     burn_out = 0;
00288     burned_out = true;
00289   }
00290   
00291   size_t stranded_count = 0;
00292   if (vm.count("fr-stranded")) {
00293     direction = FR;
00294     stranded_count++;
00295   }
00296   if (vm.count("rf-stranded")) {
00297     direction = RF;
00298     stranded_count++;
00299   }
00300   if (vm.count("f-stranded")) {
00301     direction = F;
00302     stranded_count++;
00303   }
00304   if (vm.count("r-stranded")) {
00305     direction = R;
00306     stranded_count++;
00307   }
00308   if (stranded_count > 1) {
00309     logger.severe("Multiple strandedness flags cannot be specified in the same "
00310                   "run.");
00311   }
00312   if (vm.count("logtostderr")) {
00313     logger.info_out(&cerr);
00314   }
00315   
00316   edit_detect = vm.count("edit-detect");
00317   calc_covar = vm.count("calc-covar");
00318   bias_correct = !(vm.count("no-bias-correct"));
00319   error_model = !(vm.count("no-error-model"));
00320   output_align_prob = vm.count("output-align-prob");
00321   output_align_samp = vm.count("output-align-samp");
00322   output_running_rounds = vm.count("output-running-rounds");
00323   output_running_reads = vm.count("output-running-reads");
00324   batch_mode = vm.count("batch-mode");
00325   both = vm.count("both");
00326   remaining_rounds = max(additional_online, additional_batch);
00327   spark_pre = vm.count("preprocess");
00328 
00329   if (batch_mode) {
00330     ff_param = 1;
00331   }
00332   
00333   if (additional_online > 0 && additional_batch > 0) {
00334     logger.severe("Cannot add both online and batch rounds.");
00335   } else if (additional_online > 0) {
00336     online_additional = true;
00337   }
00338   
00339   if (output_align_prob && output_align_samp) {
00340     logger.severe("Cannot output both alignment probabilties and sampled "
00341                   "alignments.");
00342   }
00343   if ((output_align_prob || output_align_samp) && remaining_rounds == 0) {
00344     logger.warn("It is recommended that at least one additional round "
00345                 "be used when outputting alignment probabilities or sampled "
00346                 "alignments. Use the '-B' or '-O' option to enable.");
00347   }
00348   
00349   // We have 1 processing thread and 1 parsing thread always, so we should not
00350   // count these as additional threads.
00351   if (num_threads < 2) {
00352     num_threads = 0;
00353   }
00354   num_threads -= 2;
00355   if (num_threads > 0) {
00356     num_threads -= edit_detect;
00357   }
00358   if (remaining_rounds && in_map_file_names == "") {
00359     logger.severe("Cannot process multiple rounds from streaming input.");
00360   }
00361   if (remaining_rounds) {
00362     last_round = false;
00363   }
00364   if (prior_file != "") {
00365     expr_alpha_map = parse_priors(prior_file);
00366   }
00367 
00368 #ifndef WIN32
00369   if (!vm.count("no-update-check")) {
00370     check_version(PACKAGE_VERSION);
00371   }
00372 #endif
00373 
00374   return 0;
00375 }
00376 
00386 void output_results(Librarian& libs, size_t tot_counts, int n=-1) {
00387   char buff[500];
00388   string dir = output_dir;
00389   if (n >= 0) {
00390     sprintf(buff, "%s/x_%d", output_dir.c_str(), n);
00391     logger.info("Writing results to %s.", buff);
00392     dir = string(buff);
00393     try {
00394       fs::create_directories(dir);
00395     } catch (fs::filesystem_error& e) {
00396       logger.severe(e.what());
00397     }
00398   }
00399   libs[0].targ_table->output_results(dir, tot_counts, last_round&calc_covar,
00400                                      last_round&edit_detect);
00401 
00402   for (size_t l = 0; l < libs.size(); l++) {
00403     if (libs.size() > 1) {
00404       sprintf(buff, "%s/params.%d.xprs", dir.c_str(), (int)l+1);
00405     } else {
00406       sprintf(buff, "%s/params.xprs", dir.c_str());
00407     }
00408     ofstream paramfile(buff);
00409     (libs[l].fld)->append_output(paramfile, "Fragment");
00410     if (libs[l].mismatch_table) {
00411       (libs[l].mismatch_table)->append_output(paramfile);
00412     }
00413     if (libs[l].bias_table) {
00414       (libs[l].bias_table)->append_output(paramfile);
00415     }
00416     paramfile.close();
00417   }
00418 }
00419 
00427 void process_fragment(Fragment* frag_p) {
00428   Fragment& frag = *frag_p;
00429   const Library& lib = *frag.lib();
00430 
00431   // sort hits to avoid deadlock
00432   frag.sort_hits();
00433   double mass_n = frag.mass();
00434 
00435   assert(frag.num_hits());
00436 
00437   vector<double> masses(frag.num_hits(), 0);
00438   vector<double> variances(frag.num_hits(), 0);
00439   double total_likelihood = LOG_0;
00440   double total_mass = LOG_0;
00441   double total_variance = LOG_0;
00442   size_t num_solvable = 0;
00443 
00444   // set of locked targets
00445   boost::unordered_set<const Target*> targ_set;
00446   boost::unordered_set<const Target*> locked_set;
00447 
00448   // Update bundles and merge in first loop
00449   Bundle* bundle = frag.hits()[0]->target()->bundle();
00450   
00451   if (frag.num_hits() > 1) {
00452     // Calculate marginal likelihoods and lock targets.
00453     for (size_t i = 0; i < frag.num_hits(); ++i) {
00454       FragHit& m = *frag.hits()[i];
00455       Target* t = m.target();
00456       
00457       bundle = lib.targ_table->merge_bundles(bundle, t->bundle());
00458       t->bundle(bundle);
00459       
00460       if (locked_set.count(t) == 0) {
00461         t->lock();
00462         locked_set.insert(t);
00463       }
00464       targ_set = locked_set;
00465       // lock neighbors
00466       foreach (const Target* neighbor, *m.neighbors()) {
00467         if (locked_set.count(neighbor) == 0) {
00468           neighbor->lock();
00469           locked_set.insert(neighbor);
00470         }
00471       }
00472       m.params()->align_likelihood = t->align_likelihood(m);
00473       m.params()->full_likelihood = m.params()->align_likelihood +
00474                                     t->sample_likelihood(first_round,
00475                                                          m.neighbors());
00476       masses[i] = t->mass();
00477       variances[i] = t->mass_var();
00478       total_likelihood = log_add(total_likelihood, m.params()->full_likelihood);
00479       total_mass = log_add(total_mass, masses[i]);
00480       total_variance = log_add(total_variance, variances[i]);
00481       num_solvable += t->solvable();
00482       assert(!isnan(total_likelihood));
00483     }
00484   } else {
00485     FragHit& m = *frag.hits()[0];
00486     Target* t = m.target();
00487     t->lock();
00488     locked_set.insert(t);
00489     total_likelihood = 0;
00490     m.params()->align_likelihood = 0;
00491     m.params()->full_likelihood = 0;
00492     foreach (const Target* neighbor, *frag.hits()[0]->neighbors()) {
00493       if (targ_set.count(neighbor) == 0) {
00494         neighbor->lock();
00495         locked_set.insert(neighbor);
00496       }
00497     }
00498   }
00499 
00500   if (islzero(total_likelihood)){
00501     assert(expr_alpha_map);
00502     logger.warn("Fragment '%s' has 0 likelihood of originating from the "
00503                 "transcriptome. Skipping...", frag.name().c_str());
00504     foreach (const Target* t, locked_set) {
00505       t->unlock();
00506     }
00507     return;
00508   }
00509 
00510   if (first_round) {
00511     bundle->incr_counts();
00512   }
00513   if (first_round || online_additional) {
00514     bundle->incr_mass(mass_n);
00515   }
00516   
00517   // normalize marginal likelihoods
00518   for (size_t i = 0; i < frag.num_hits(); ++i) {
00519     FragHit& m = *frag[i];
00520     Target* t  = m.target();
00521     
00522     double p = m.params()->full_likelihood-total_likelihood;
00523     m.params()->posterior = p;
00524     if (targ_set.size() > 1) {
00525       double v = log_add(variances[i] - 2*total_mass,
00526                   total_variance + 2*masses[i] - 4*total_mass);
00527       t->add_hit(m, v, mass_n);
00528     } else if (i == 0) {
00529       t->add_hit(m, LOG_0, mass_n);
00530     }
00531 
00532     // update parameters
00533     if (first_round) {
00534       double r = rand()/double(RAND_MAX);
00535       
00536       if (i == 0 || frag[i-1]->target_id() != t->id()) {
00537         t->incr_counts(targ_set.size() <= 1);
00538       }
00539       if (!t->solvable() && num_solvable == frag.num_hits()-1) {
00540         t->solvable(true);
00541       }
00542       if (edit_detect && lib.mismatch_table) {
00543         (lib.mismatch_table)->update(m, p, lib.mass_n);
00544       }
00545       if (!burned_out && r < sexp(p)) {
00546         if (lib.mismatch_table && !edit_detect) {
00547           (lib.mismatch_table)->update(m, LOG_1, lib.mass_n);
00548         }
00549         if (m.pair_status() == PAIRED) {
00550           (lib.fld)->add_val(m.length(), lib.mass_n);
00551         }
00552         if (lib.bias_table) {
00553           (lib.bias_table)->update_observed(m, lib.mass_n);
00554         }
00555       }
00556     }
00557     if (calc_covar && (last_round || online_additional)) {
00558       double var = 2*mass_n + p + log_sub(LOG_1, p);
00559       lib.targ_table->update_covar(m.target_id(), m.target_id(), var);
00560       for (size_t j = i+1; j < frag.num_hits(); ++j) {
00561         const FragHit& m2 = *frag.hits()[j];
00562         double p2 = m2.params()->full_likelihood-total_likelihood;
00563         if (sexp(p2) == 0) {
00564           continue;
00565         }
00566         double covar = 2*mass_n + p + p2;
00567         lib.targ_table->update_covar(m.target_id(), m2.target_id(), covar);
00568       }
00569     }
00570   }
00571 
00572   foreach (const Target* t, locked_set) {
00573     t->unlock();
00574   }
00575 }
00576 
00583 void proc_thread(ParseThreadSafety* pts) {
00584   while (true) {
00585     Fragment* frag = pts->proc_on.pop();
00586     if (!frag) {
00587       break;
00588     }
00589     process_fragment(frag);
00590     pts->proc_out.push(frag);
00591   }
00592 }
00593 
00603 size_t threaded_calc_abundances(Librarian& libs) {
00604   logger.info("Processing input fragment alignments...");
00605   boost::scoped_ptr<boost::thread> bias_update;
00606 
00607   size_t n = 1;
00608   size_t num_frags = 0;
00609   double mass_n = 0;
00610 
00611   // For log-scale output
00612   size_t i = 1;
00613   size_t j = 6;
00614 
00615   DirectionDetector dir_detector;
00616   Fragment* frag;
00617   
00618   while (true) {
00619     // Loop through libraries
00620     for (size_t l = 0; l < libs.size(); l++) {
00621       Library& lib = libs[l];
00622       libs.set_curr(l);
00623       MapParser& map_parser = *lib.map_parser;
00624       boost::mutex bu_mut;
00625       // Used to signal bias update thread
00626       running = true;
00627       ParseThreadSafety pts(max((int)num_threads,10));
00628       boost::thread parse(&MapParser::threaded_parse, &map_parser, &pts,
00629                           stop_at, num_neighbors);
00630       vector<boost::thread*> thread_pool;
00631       RobertsFilter frags_seen;
00632 
00633       burned_out = lib.n >= burn_out;
00634       while(true) {
00635         if (lib.n == burn_in) {
00636           bias_update.reset(new boost::thread(&TargetTable::asynch_bias_update,
00637                                               lib.targ_table, &bu_mut));
00638           if (lib.mismatch_table) {
00639             (lib.mismatch_table)->activate();
00640           }
00641         }
00642         if (lib.n == burn_out) {
00643           if (lib.mismatch_table) {
00644             (lib.mismatch_table)->fix();
00645           };
00646           burned_out = true;
00647         }
00648         // Start threads once aux parameters are burned out
00649         if (burned_out && num_threads && thread_pool.size() == 0) {
00650           lib.targ_table->enable_bundle_threadsafety();
00651           thread_pool = vector<boost::thread*>(num_threads);
00652           for (size_t k = 0; k < thread_pool.size(); k++) {
00653             thread_pool[k] = new boost::thread(proc_thread, &pts);
00654           }
00655         }
00656 
00657         // Pop next parsed fragment and set mass
00658         frag = pts.proc_in.pop();
00659         if (frag) {
00660           frag->mass(mass_n);
00661           dir_detector.add_fragment(frag);
00662         }
00663 
00664         // Test that we have not already seen this fragment
00665         if (frag && first_round && frags_seen.test_and_push(frag->name())) {
00666           logger.severe("Alignments are not properly sorted. Read '%s' has "
00667                         "alignments which are non-consecutive.",
00668                         frag->name().c_str());
00669         }
00670 
00671         // If multi-threaded and burned out, push to the processing queue
00672         if (num_threads && burned_out) {
00673           // If no more fragments, send stop signal (NULL) to processing threads
00674           if (!frag) {
00675             for (size_t k = 0; k < thread_pool.size(); ++k) {
00676               pts.proc_on.push(NULL);
00677             }
00678             break;
00679           }
00680           pts.proc_on.push(frag);
00681         } else {
00682           if (!frag) {
00683             break;
00684           }
00685           {
00686             // Block the bias update thread from updating the paramater tables
00687             // during processing. We don't need to do this during multi-threaded
00688             // processing since the parameters are burned out before we start
00689             // the threads.
00690             boost::unique_lock<boost::mutex> lock(bu_mut);
00691             process_fragment(frag);
00692             pts.proc_out.push(frag);
00693           }
00694         }
00695 
00696         // Output intermediate results, if necessary
00697         if (output_running_reads && n == i*pow(10.,(double)j)) {
00698           boost::unique_lock<boost::mutex> lock(bu_mut);
00699           output_results(libs, n, (int)n);
00700           if (i++ == 9) {
00701             i = 1;
00702             j++;
00703           }
00704         }
00705         num_frags++;
00706 
00707         // Output progress
00708         if (num_frags % 1000000 == 0) {
00709           logger.info("Fragments Processed (%s): %d\tNumber of Bundles: %d.",
00710                       lib.in_file_name.c_str(), num_frags,
00711                       lib.targ_table->num_bundles());
00712           dir_detector.report_if_improper_direction();
00713         }
00714 
00715         n++;
00716         lib.n++;
00717         mass_n += ff_param*log((double)n-1) - log(pow(n,ff_param) - 1);
00718         lib.mass_n += ff_param*log((double)lib.n-1) -
00719                       log(pow(lib.n,ff_param) - 1);
00720       }
00721 
00722       // Signal bias update thread to stop
00723       running = false;
00724 
00725       parse.join();
00726       foreach(boost::thread* t, thread_pool) {
00727         t->join();
00728       }
00729 
00730       lib.targ_table->disable_bundle_threadsafety();
00731       lib.targ_table->collapse_bundles();
00732       
00733       if (bias_update) {
00734         logger.info("Waiting for auxiliary parameter update to complete...");
00735         bias_update->join();
00736         bias_update.reset(NULL);
00737       }
00738     }
00739 
00740     if (online_additional && remaining_rounds--) {
00741       if (output_running_rounds) {
00742         output_results(libs, n, (int)remaining_rounds);
00743       }
00744 
00745       logger.info("%d remaining rounds.", remaining_rounds);
00746       first_round = false;
00747       last_round = (remaining_rounds==0 && !both);
00748       for (size_t l = 0; l < libs.size(); l++) {
00749         libs[l].map_parser->write_active(last_round);
00750         libs[l].map_parser->reset_reader();
00751       }
00752       num_frags = 0;
00753     } else {
00754       break;
00755     }
00756   }
00757 
00758   logger.info("COMPLETED: Processed %d mapped fragments, targets are in %d "
00759               "bundles.", num_frags, libs[0].targ_table->num_bundles());
00760 
00761   return num_frags;
00762 }
00763 
00769 int estimation_main() {
00770   
00771   if (output_dir != ".") {
00772     try {
00773       fs::create_directories(output_dir);
00774     } catch (fs::filesystem_error& e) {
00775       logger.info(e.what());
00776     }
00777   }
00778   
00779   if (!fs::exists(output_dir)) {
00780     logger.severe("Cannot create directory %s.", output_dir.c_str());
00781   }
00782   
00783   // Parse input file names and instantiate Libray structs.
00784   vector<string> file_names;
00785   char buff[999];
00786   strcpy(buff, in_map_file_names.c_str());
00787   char * pch = strtok (buff,",");
00788   while (pch != NULL) {
00789     file_names.push_back(pch);
00790     pch = strtok (NULL, ",");
00791   }
00792   if (file_names.size() == 0) {
00793     file_names.push_back("");
00794   }
00795   Librarian libs(file_names.size());
00796   for (size_t i = 0; i < file_names.size(); ++i) {
00797     char out_map_file_name[500] = "";
00798     if (output_align_prob) {
00799       sprintf(out_map_file_name, "%s/hits.%d.prob",
00800               output_dir.c_str(), (int)i+1);
00801     }
00802     if (output_align_samp) {
00803       sprintf(out_map_file_name, "%s/hits.%d.samp",
00804               output_dir.c_str(), (int)i+1);
00805     }
00806     
00807     libs[i].in_file_name = file_names[i];
00808     libs[i].out_file_name = out_map_file_name;
00809     libs[i].map_parser.reset(new MapParser(&libs[i], last_round));
00810 
00811     if (param_file_name.size()) {
00812       libs[i].fld.reset(new LengthDistribution(param_file_name, "Fragment"));
00813       libs[i].mismatch_table.reset((error_model) ?
00814                                               new MismatchTable(param_file_name)
00815                                               : NULL);
00816       libs[i].bias_table.reset((bias_correct) ? new BiasBoss(bias_model_order,
00817                                                          param_file_name):NULL);
00818     } else {
00819       libs[i].fld.reset(new LengthDistribution(fld_alpha, def_fl_max,
00820                                                def_fl_mean, def_fl_stddev,
00821                                                def_fl_kernel_n,
00822                                                def_fl_kernel_p));
00823       libs[i].mismatch_table.reset((error_model) ? new MismatchTable(mm_alpha)
00824                                                    :NULL);
00825       libs[i].bias_table.reset((bias_correct) ? new BiasBoss(bias_model_order,
00826                                                     bias_alpha):NULL);
00827     }
00828     if (i > 0 &&
00829         (libs[i].map_parser->targ_index() != libs[i-1].map_parser->targ_index()
00830          || libs[i].map_parser->targ_lengths() !=
00831          libs[i-1].map_parser->targ_lengths())) {
00832           logger.severe("Alignment file headers do not match for '%s' and '%s'.",
00833                         file_names[i-1].c_str(), file_names[i].c_str());
00834         }
00835   }
00836   
00837   boost::shared_ptr<TargetTable> targ_table(
00838                                   new TargetTable(fasta_file_name,
00839                                                   haplotype_file_name,
00840                                                   edit_detect,
00841                                                   param_file_name.size(),
00842                                                   expr_alpha, expr_alpha_map,
00843                                                   &libs));
00844   size_t max_target_length = 0;
00845   for(size_t tid=0; tid < targ_table->size(); tid++) {
00846     max_target_length = max(max_target_length,
00847                             targ_table->get_targ(tid)->length());
00848   }
00849 
00850   for (size_t i = 0; i < libs.size(); ++i) {
00851     libs[i].targ_table = targ_table;
00852     if (bias_correct) {
00853       libs[i].bias_table->copy_expectations(*(libs.curr_lib().bias_table));
00854     }
00855   }
00856   double num_targ = (double)targ_table->size();
00857   
00858   if (calc_covar && (double)SSIZE_MAX < num_targ*(num_targ+1)) {
00859     logger.warn("Your system is unable to represent large enough values for "
00860                 "efficiently hashing target pairs. Covariance calculation will "
00861                 "be disabled.");
00862     calc_covar = false;
00863   }
00864   
00865   if (batch_mode) {
00866     targ_table->round_reset();
00867   }
00868   
00869   size_t tot_counts = threaded_calc_abundances(libs);
00870   if (library_size) {
00871     tot_counts = library_size;
00872   }
00873   
00874   if (!burned_out && bias_correct && param_file_name == "") {
00875     logger.warn("Not enough fragments observed to accurately learn bias "
00876                 "parameters. Either disable bias correction "
00877                 "(--no-bias-correct) or provide a file containing auxiliary "
00878                 "parameters (--aux-param-file).");
00879   }
00880   
00881   if (both) {
00882     remaining_rounds = 1;
00883     online_additional = false;
00884   }
00885   
00886   if (remaining_rounds) {
00887     targ_table->masses_to_counts();
00888   }
00889   
00890   targ_table->round_reset();
00891   ff_param = 1.0;
00892 
00893   first_round = false;
00894   
00895   while (!last_round) {
00896     if (output_running_rounds) {
00897       output_results(libs, tot_counts, (int)remaining_rounds);
00898     }
00899     remaining_rounds--;
00900     logger.info("\nRe-estimating counts with additional round of EM (%d "
00901                 "remaining)...", remaining_rounds);
00902     last_round = (remaining_rounds == 0);
00903     for (size_t l = 0; l < libs.size(); l++) {
00904       libs[l].map_parser->write_active(last_round);
00905       libs[l].map_parser->reset_reader();
00906     }
00907     tot_counts = threaded_calc_abundances(libs);
00908     if (library_size) {
00909       tot_counts = library_size;
00910     }
00911     targ_table->round_reset();
00912   }
00913   
00914    logger.info("Writing results to file...");
00915   output_results(libs, tot_counts);
00916   logger.info("Done.");
00917   
00918   return 0;
00919 }
00920 
00921 #ifdef PROTO
00922 inline string base64_encode(const string& to_encode) {
00923   using namespace boost::archive::iterators;
00924   typedef base64_from_binary<transform_width<string::const_iterator,6,8> > it_base64_t;
00925   unsigned int writePaddChars = (3-to_encode.length()%3)%3;
00926   string base64(it_base64_t(to_encode.begin()), it_base64_t(to_encode.end()));
00927   base64.append(writePaddChars,'=');
00928   return base64;
00929 }
00930 
00931 int preprocess_main() {
00932   try {
00933     fs::create_directories(output_dir);
00934   } catch (fs::filesystem_error& e) {
00935       logger.info(e.what());
00936   }
00937   
00938   if (!fs::exists(output_dir)) {
00939     logger.severe("Cannot create directory %s.", output_dir.c_str());
00940   }
00941   
00942   Librarian libs(1);
00943   Library& lib = libs[0];
00944   lib.in_file_name = in_map_file_names;
00945   lib.out_file_name = "";
00946   
00947   lib.map_parser.reset(new MapParser (&lib, false));
00948   lib.fld.reset(new LengthDistribution(0, 0, 0, 1, 2, 0));
00949   MarkovModel bias_model(3, 21, 21, 0);
00950   MismatchTable mismatch_table(0);
00951   lib.targ_table.reset(new TargetTable(fasta_file_name, "", 0, 0, 0.0, NULL,
00952                                        &libs));
00953   
00954   logger.info("Converting targets to Protocol Buffers...");
00955   fstream targ_out((output_dir + "/targets.pb").c_str(),
00956                    ios::out | ios::trunc);
00957   string out_buff;
00958   proto::Target target_proto;
00959   for (TargID id = 0; id < lib.targ_table->size(); ++id) {
00960     target_proto.Clear();
00961     Target& targ = *lib.targ_table->get_targ(id);
00962     target_proto.set_name(targ.name());
00963     target_proto.set_id((unsigned int)targ.id());
00964     target_proto.set_length((unsigned int)targ.length());
00965     target_proto.set_seq(targ.seq(0).serialize());
00966     
00967     target_proto.SerializeToString(&out_buff);
00968     targ_out << base64_encode(out_buff) << endl;
00969   }
00970   targ_out.close();
00971   
00972   logger.info("Converting fragment alignments to Protocol Buffers...");
00973   ostream frag_out(cout.rdbuf());
00974   
00975   size_t num_frags = 0;
00976   Fragment* frag;
00977   
00978   ParseThreadSafety pts(10);
00979   boost::thread parse(&MapParser::threaded_parse, lib.map_parser.get(), &pts,
00980                       stop_at, 0);
00981   RobertsFilter frags_seen;
00982   proto::Fragment frag_proto;
00983   while(true) {
00984     frag_proto.Clear();
00985     
00986     // Pop next parsed fragment and set mass
00987     frag = pts.proc_in.pop();
00988     
00989     if (!frag) {
00990       break;
00991     }
00992     
00993     // Test that we have not already seen this fragment
00994     if (frags_seen.test_and_push(frag->name())) {
00995       logger.severe("Alignments are not properly sorted. Read '%s' has "
00996                     "alignments which are non-consecutive.",
00997                     frag->name().c_str());
00998     }
00999     
01000     frag_proto.set_paired(frag->paired());
01001     
01002     vector<char> ref_left_mm_indices;
01003     vector<char> ref_left_mm_seq;
01004     vector<char> ref_left_mm_ref;
01005     vector<char> ref_right_mm_indices;
01006     vector<char> ref_right_mm_seq;
01007     vector<char> ref_right_mm_ref;
01008     bool ref_left_first = false;
01009     
01010     for (size_t i = 0; i < frag->num_hits(); ++i) {
01011       FragHit& fh = *(*frag)[i];
01012       proto::FragmentAlignment& align_proto = *frag_proto.add_alignments();
01013       align_proto.set_target_id((unsigned int)fh.target_id());
01014       
01015       vector<char> left_mm_indices;
01016       vector<char> left_mm_seq;
01017       vector<char> left_mm_ref;
01018       vector<char> right_mm_indices;
01019       vector<char> right_mm_seq;
01020       vector<char> right_mm_ref;
01021       
01022       mismatch_table.get_indices(fh, left_mm_indices, left_mm_seq, left_mm_ref,
01023                                  right_mm_indices, right_mm_seq, right_mm_ref);
01024       
01025       if (i == 0) {
01026         ref_left_mm_indices = left_mm_indices;
01027         ref_left_mm_seq = left_mm_seq;
01028         ref_left_mm_ref = left_mm_ref;
01029         ref_right_mm_indices = right_mm_indices;
01030         ref_right_mm_seq = right_mm_seq;
01031         ref_right_mm_ref = right_mm_ref;
01032       }
01033       
01034       ReadHit* read_l = fh.left_read();
01035       if (read_l) {
01036         if (i==0) { ref_left_first = read_l->first; }
01037         proto::ReadAlignment& read_proto = *align_proto.mutable_read_l();
01038         read_proto.set_first(read_l->first);
01039         read_proto.set_left_pos(read_l->left);
01040         read_proto.set_right_pos(read_l->right-1);
01041         if (i == 0 || read_l->first != ref_left_first ||
01042             left_mm_indices != ref_left_mm_indices ||
01043             left_mm_seq != ref_left_mm_seq ||
01044             left_mm_ref != ref_left_mm_ref) {
01045           read_proto.set_mismatch_indices(string(left_mm_indices.begin(),
01046                                                  left_mm_indices.end()));
01047           read_proto.set_mismatch_nucs(string(left_mm_seq.begin(),
01048                                               left_mm_seq.end()));
01049         }
01050       }
01051       
01052       ReadHit* read_r = fh.right_read();
01053       if (read_r) {
01054         if (i==0) { ref_left_first = !read_r->first; }
01055         proto::ReadAlignment& read_proto = *align_proto.mutable_read_r();
01056         read_proto.set_first(read_r->first);
01057         read_proto.set_left_pos(read_r->left);
01058         read_proto.set_right_pos(read_r->right-1);
01059         if (i == 0 || read_r->first == ref_left_first ||
01060             right_mm_indices != ref_right_mm_indices ||
01061             right_mm_seq != ref_right_mm_seq ||
01062             right_mm_ref != ref_right_mm_ref) {
01063           read_proto.set_mismatch_indices(string(right_mm_indices.begin(),
01064                                                  right_mm_indices.end()));
01065           read_proto.set_mismatch_nucs(string(right_mm_seq.begin(),
01066                                               right_mm_seq.end()));
01067         }
01068       }
01069     }
01070     frag_proto.SerializeToString(&out_buff);
01071     frag_out << base64_encode(out_buff) << endl;
01072     
01073     pts.proc_out.push(frag);
01074     
01075     num_frags++;
01076     
01077     // Output progress
01078     if (num_frags % 1000000 == 0) {
01079       logger.info("Fragments Processed: %d", num_frags);
01080     }
01081   }
01082   
01083   parse.join();
01084   
01085   return 0;
01086 }
01087 #endif
01088 
01089 
01090 int main (int argc, char ** argv)
01091 {
01092 
01093   srand((unsigned int)time(NULL));
01094   int parse_ret = parse_options(argc,argv);
01095   if (parse_ret) {
01096     return parse_ret;
01097   }
01098   
01099 #ifdef PROTO
01100   if (spark_pre) {
01101     return preprocess_main();
01102   }
01103 #endif
01104   
01105   return estimation_main();
01106 }
 All Classes Functions Variables