eXpress “1.5”
|
00001 00009 #include <boost/unordered_map.hpp> 00010 #include <boost/unordered_set.hpp> 00011 #include <boost/filesystem.hpp> 00012 #include <boost/program_options.hpp> 00013 #include <boost/thread.hpp> 00014 #include <iostream> 00015 #include <iomanip> 00016 #include <fstream> 00017 #include <stdio.h> 00018 #include <stdlib.h> 00019 #include <time.h> 00020 #include "main.h" 00021 #include "bundles.h" 00022 #include "targets.h" 00023 #include "lengthdistribution.h" 00024 #include "fragments.h" 00025 #include "biascorrection.h" 00026 #include "mismatchmodel.h" 00027 #include "mapparser.h" 00028 #include "threadsafety.h" 00029 #include "robertsfilter.h" 00030 #include "directiondetector.h" 00031 #include "library.h" 00032 00033 #ifdef PROTO 00034 #include PROTO_ALIGNMENT_INCL 00035 #include PROTO_TARGET_INCL 00036 #include <boost/archive/iterators/base64_from_binary.hpp> 00037 #include <boost/archive/iterators/transform_width.hpp> 00038 #include <boost/archive/iterators/ostream_iterator.hpp> 00039 #endif 00040 00041 #ifndef WIN32 00042 #include "update_check.h" 00043 #endif 00044 00045 using namespace std; 00046 namespace po = boost::program_options; 00047 namespace fs = boost::filesystem; 00048 00049 Logger logger; 00050 00051 // the forgetting factor parameter controls the growth of the fragment mass 00052 double ff_param = 0.85; 00053 00054 // the burn-in parameter determines how many reads are required before the 00055 // error and bias models are applied to probabilistic assignment 00056 size_t burn_in = 100000; 00057 size_t burn_out = 5000000; 00058 bool burned_out = false; 00059 00060 size_t max_read_len = 250; 00061 00062 size_t max_indel_size = 10; 00063 00064 size_t stop_at = 0; 00065 00066 // file location parameters 00067 string output_dir = "."; 00068 string fasta_file_name = ""; 00069 string in_map_file_names = ""; 00070 string param_file_name = ""; 00071 string haplotype_file_name = ""; 00072 00073 // intial pseudo-count parameters (non-logged) 00074 double expr_alpha = .005; 00075 double fld_alpha = 1; 00076 double bias_alpha = 1; 00077 double mm_alpha = 1; 00078 00079 size_t bias_model_order = 3; 00080 00081 // fragment length parameters 00082 size_t def_fl_max = 800; 00083 size_t def_fl_mean = 200; 00084 size_t def_fl_stddev = 80; 00085 size_t def_fl_kernel_n = 4; 00086 double def_fl_kernel_p = 0.5; 00087 00088 // option parameters 00089 bool edit_detect = false; 00090 bool error_model = true; 00091 bool bias_correct = true; 00092 bool calc_covar = false; 00093 bool output_align_prob = false; 00094 bool output_align_samp = false; 00095 bool output_running_rounds = false; 00096 bool output_running_reads = false; 00097 size_t num_threads = 2; 00098 size_t num_neighbors = 0; 00099 size_t library_size = 0; 00100 00101 // directional parameters 00102 Direction direction = BOTH; 00103 00104 bool running = true; 00105 00106 // used for multiple rounds of EM 00107 bool first_round = true; 00108 bool last_round = true; 00109 bool batch_mode = false; 00110 bool online_additional = false; 00111 bool both = false; 00112 size_t remaining_rounds = 0; 00113 00114 bool spark_pre = false; 00115 00116 typedef boost::unordered_map<string, double> AlphaMap; 00117 AlphaMap* expr_alpha_map = NULL; 00118 00124 AlphaMap* parse_priors(string in_file) { 00125 ifstream ifs(in_file.c_str()); 00126 if (!ifs.is_open()) { 00127 logger.severe("Unable to open input priors file '%s'.", in_file.c_str()); 00128 } 00129 AlphaMap* alphas = new AlphaMap(); 00130 00131 string line; 00132 00133 while(ifs.good()) { 00134 getline(ifs,line); 00135 00136 size_t idx = line.find_first_of("\t "); 00137 if (idx!=string::npos) { 00138 string name = line.substr(0,idx); 00139 string val = line.substr(idx+1); 00140 (*alphas)[name] = atof(val.c_str()); 00141 } 00142 } 00143 return alphas; 00144 }; 00145 00152 bool parse_options(int ac, char ** av) { 00153 00154 size_t additional_online = 0; 00155 size_t additional_batch = 0; 00156 00157 po::options_description standard("Standard Options"); 00158 standard.add_options() 00159 ("help,h", "produce help message") 00160 ("output-dir,o", po::value<string>(&output_dir)->default_value(output_dir), 00161 "write all output files to this directory") 00162 #ifdef PROTO 00163 ("preprocess,D", "run preprocess script for eXpressD") 00164 #endif 00165 ("frag-len-mean,m", po::value<size_t>(&def_fl_mean)->default_value(def_fl_mean), 00166 "prior estimate for average fragment length") 00167 ("frag-len-stddev,s", 00168 po::value<size_t>(&def_fl_stddev)->default_value(def_fl_stddev), 00169 "prior estimate for fragment length std deviation") 00170 ("haplotype-file,H", 00171 po::value<string>(&haplotype_file_name)->default_value(haplotype_file_name), 00172 "path to a file containing haplotype pairs") 00173 ("additional-batch,B", 00174 po::value<size_t>(&additional_batch)->default_value(additional_batch), 00175 "number of additional batch EM rounds after initial online round") 00176 ("additional-online,O", 00177 po::value<size_t>(&additional_online)->default_value(additional_online), 00178 "number of additional online EM rounds after initial online round") 00179 ("max-read-len,L", 00180 po::value<size_t>(&max_read_len)->default_value(max_read_len), 00181 "maximum allowed length of a read") 00182 ("output-align-prob", 00183 "output alignments (sam/bam) with probabilistic assignments") 00184 ("output-align-samp", 00185 "output alignments (sam/bam) with sampled assignments") 00186 ("fr-stranded", 00187 "accept only forward->reverse alignments (second-stranded protocols)") 00188 ("rf-stranded", 00189 "accept only reverse->forward alignments (first-stranded protocols)") 00190 ("f-stranded", 00191 "accept only forward single-end alignments (second-stranded protocols)") 00192 ("r-stranded", 00193 "accept only reverse single-end alignments (first-stranded protocols)") 00194 ("no-update-check", "disables automatic check for update via web") 00195 ("logtostderr", "prints all logging messages to stderr") 00196 ; 00197 00198 po::options_description advanced("Advanced Options"); 00199 advanced.add_options() 00200 ("forget-param,f", po::value<double>(&ff_param)->default_value(ff_param), 00201 "sets the 'forgetting factor' parameter (0.5 < c <= 1)") 00202 ("library-size", po::value<size_t>(&library_size), 00203 "specifies library size for FPKM instead of calculating from alignments") 00204 ("max-indel-size", 00205 po::value<size_t>(&max_indel_size)->default_value(max_indel_size), 00206 "sets the maximum allowed indel size, affecting geometric indel prior") 00207 ("calc-covar", "calculate and output covariance matrix") 00208 ("expr-alpha", po::value<double>(&expr_alpha)->default_value(expr_alpha), 00209 "sets the strength of the prior, per bp") 00210 ("stop-at", po::value<size_t>(&stop_at)->default_value(stop_at), 00211 "sets the number of fragments to process, disabled with 0") 00212 ("burn-out", po::value<size_t>(&burn_out)->default_value(burn_out), 00213 "sets number of fragments after which to stop updating auxiliary parameters") 00214 ("no-bias-correct", "disables bias correction") 00215 ("no-error-model", "disables error modelling") 00216 ("aux-param-file", 00217 po::value<string>(¶m_file_name)->default_value(param_file_name), 00218 "path to file containing auxiliary parameters to use instead of learning") 00219 ; 00220 00221 string prior_file = ""; 00222 00223 po::options_description hidden("Experimental/Debug Options"); 00224 hidden.add_options() 00225 ("num-threads,p", po::value<size_t>(&num_threads)->default_value(num_threads), 00226 "number of threads (>= 2)") 00227 ("edit-detect","") 00228 ("single-round", "") 00229 ("output-running-rounds", "") 00230 ("output-running-reads", "") 00231 ("batch-mode","") 00232 ("both","") 00233 ("prior-params", po::value<string>(&prior_file)->default_value(""), "") 00234 ("sam-file", po::value<string>(&in_map_file_names)->default_value(""), "") 00235 ("fasta-file", po::value<string>(&fasta_file_name)->default_value(""), "") 00236 ("num-neighbors", po::value<size_t>(&num_neighbors)->default_value(0), "") 00237 ("bias-model-order", 00238 po::value<size_t>(&bias_model_order)->default_value(bias_model_order), 00239 "sets the order of the Markov chain used to model sequence bias") 00240 ; 00241 00242 po::positional_options_description positional; 00243 positional.add("fasta-file",1).add("sam-file",1); 00244 00245 po::options_description cmdline_options; 00246 cmdline_options.add(standard).add(advanced).add(hidden); 00247 00248 bool error = false; 00249 po::variables_map vm; 00250 try { 00251 po::store(po::command_line_parser(ac, av).options(cmdline_options) 00252 .positional(positional).run(), vm); 00253 } catch (po::error& e) { 00254 logger.info("Command-Line Argument Error: %s.", e.what()); 00255 error = true; 00256 } 00257 po::notify(vm); 00258 00259 if (ff_param > 1.0 || ff_param < 0.5) { 00260 logger.info("Command-Line Argument Error: forget-param/f option must be " 00261 "between 0.5 and 1.0."); 00262 error= true; 00263 } 00264 00265 if (fasta_file_name == "") { 00266 logger.info("Command-Line Argument Error: target sequence fasta file " 00267 "required."); 00268 error = true; 00269 } 00270 00271 if (error || vm.count("help")) { 00272 cerr << "express v" << PACKAGE_VERSION << endl 00273 << "-----------------------------\n" 00274 << "File Usage: express [options] <target_seqs.fa> <hits.(sam/bam)>\n" 00275 << "Piped Usage: bowtie [options] -S <index> <reads.fq> | express " 00276 << "[options] <target_seqs.fa>\n\n" 00277 << "Required arguments:\n" 00278 << " <target_seqs.fa> target sequence file in fasta format\n" 00279 << " <hits.(sam/bam)> read alignment file in SAM or BAM format\n\n" 00280 << standard 00281 << advanced; 00282 return 1; 00283 } 00284 00285 if (param_file_name.size()) { 00286 burn_in = 0; 00287 burn_out = 0; 00288 burned_out = true; 00289 } 00290 00291 size_t stranded_count = 0; 00292 if (vm.count("fr-stranded")) { 00293 direction = FR; 00294 stranded_count++; 00295 } 00296 if (vm.count("rf-stranded")) { 00297 direction = RF; 00298 stranded_count++; 00299 } 00300 if (vm.count("f-stranded")) { 00301 direction = F; 00302 stranded_count++; 00303 } 00304 if (vm.count("r-stranded")) { 00305 direction = R; 00306 stranded_count++; 00307 } 00308 if (stranded_count > 1) { 00309 logger.severe("Multiple strandedness flags cannot be specified in the same " 00310 "run."); 00311 } 00312 if (vm.count("logtostderr")) { 00313 logger.info_out(&cerr); 00314 } 00315 00316 edit_detect = vm.count("edit-detect"); 00317 calc_covar = vm.count("calc-covar"); 00318 bias_correct = !(vm.count("no-bias-correct")); 00319 error_model = !(vm.count("no-error-model")); 00320 output_align_prob = vm.count("output-align-prob"); 00321 output_align_samp = vm.count("output-align-samp"); 00322 output_running_rounds = vm.count("output-running-rounds"); 00323 output_running_reads = vm.count("output-running-reads"); 00324 batch_mode = vm.count("batch-mode"); 00325 both = vm.count("both"); 00326 remaining_rounds = max(additional_online, additional_batch); 00327 spark_pre = vm.count("preprocess"); 00328 00329 if (batch_mode) { 00330 ff_param = 1; 00331 } 00332 00333 if (additional_online > 0 && additional_batch > 0) { 00334 logger.severe("Cannot add both online and batch rounds."); 00335 } else if (additional_online > 0) { 00336 online_additional = true; 00337 } 00338 00339 if (output_align_prob && output_align_samp) { 00340 logger.severe("Cannot output both alignment probabilties and sampled " 00341 "alignments."); 00342 } 00343 if ((output_align_prob || output_align_samp) && remaining_rounds == 0) { 00344 logger.warn("It is recommended that at least one additional round " 00345 "be used when outputting alignment probabilities or sampled " 00346 "alignments. Use the '-B' or '-O' option to enable."); 00347 } 00348 00349 // We have 1 processing thread and 1 parsing thread always, so we should not 00350 // count these as additional threads. 00351 if (num_threads < 2) { 00352 num_threads = 0; 00353 } 00354 num_threads -= 2; 00355 if (num_threads > 0) { 00356 num_threads -= edit_detect; 00357 } 00358 if (remaining_rounds && in_map_file_names == "") { 00359 logger.severe("Cannot process multiple rounds from streaming input."); 00360 } 00361 if (remaining_rounds) { 00362 last_round = false; 00363 } 00364 if (prior_file != "") { 00365 expr_alpha_map = parse_priors(prior_file); 00366 } 00367 00368 #ifndef WIN32 00369 if (!vm.count("no-update-check")) { 00370 check_version(PACKAGE_VERSION); 00371 } 00372 #endif 00373 00374 return 0; 00375 } 00376 00386 void output_results(Librarian& libs, size_t tot_counts, int n=-1) { 00387 char buff[500]; 00388 string dir = output_dir; 00389 if (n >= 0) { 00390 sprintf(buff, "%s/x_%d", output_dir.c_str(), n); 00391 logger.info("Writing results to %s.", buff); 00392 dir = string(buff); 00393 try { 00394 fs::create_directories(dir); 00395 } catch (fs::filesystem_error& e) { 00396 logger.severe(e.what()); 00397 } 00398 } 00399 libs[0].targ_table->output_results(dir, tot_counts, last_round&calc_covar, 00400 last_round&edit_detect); 00401 00402 for (size_t l = 0; l < libs.size(); l++) { 00403 if (libs.size() > 1) { 00404 sprintf(buff, "%s/params.%d.xprs", dir.c_str(), (int)l+1); 00405 } else { 00406 sprintf(buff, "%s/params.xprs", dir.c_str()); 00407 } 00408 ofstream paramfile(buff); 00409 (libs[l].fld)->append_output(paramfile, "Fragment"); 00410 if (libs[l].mismatch_table) { 00411 (libs[l].mismatch_table)->append_output(paramfile); 00412 } 00413 if (libs[l].bias_table) { 00414 (libs[l].bias_table)->append_output(paramfile); 00415 } 00416 paramfile.close(); 00417 } 00418 } 00419 00427 void process_fragment(Fragment* frag_p) { 00428 Fragment& frag = *frag_p; 00429 const Library& lib = *frag.lib(); 00430 00431 // sort hits to avoid deadlock 00432 frag.sort_hits(); 00433 double mass_n = frag.mass(); 00434 00435 assert(frag.num_hits()); 00436 00437 vector<double> masses(frag.num_hits(), 0); 00438 vector<double> variances(frag.num_hits(), 0); 00439 double total_likelihood = LOG_0; 00440 double total_mass = LOG_0; 00441 double total_variance = LOG_0; 00442 size_t num_solvable = 0; 00443 00444 // set of locked targets 00445 boost::unordered_set<const Target*> targ_set; 00446 boost::unordered_set<const Target*> locked_set; 00447 00448 // Update bundles and merge in first loop 00449 Bundle* bundle = frag.hits()[0]->target()->bundle(); 00450 00451 if (frag.num_hits() > 1) { 00452 // Calculate marginal likelihoods and lock targets. 00453 for (size_t i = 0; i < frag.num_hits(); ++i) { 00454 FragHit& m = *frag.hits()[i]; 00455 Target* t = m.target(); 00456 00457 bundle = lib.targ_table->merge_bundles(bundle, t->bundle()); 00458 t->bundle(bundle); 00459 00460 if (locked_set.count(t) == 0) { 00461 t->lock(); 00462 locked_set.insert(t); 00463 } 00464 targ_set = locked_set; 00465 // lock neighbors 00466 foreach (const Target* neighbor, *m.neighbors()) { 00467 if (locked_set.count(neighbor) == 0) { 00468 neighbor->lock(); 00469 locked_set.insert(neighbor); 00470 } 00471 } 00472 m.params()->align_likelihood = t->align_likelihood(m); 00473 m.params()->full_likelihood = m.params()->align_likelihood + 00474 t->sample_likelihood(first_round, 00475 m.neighbors()); 00476 masses[i] = t->mass(); 00477 variances[i] = t->mass_var(); 00478 total_likelihood = log_add(total_likelihood, m.params()->full_likelihood); 00479 total_mass = log_add(total_mass, masses[i]); 00480 total_variance = log_add(total_variance, variances[i]); 00481 num_solvable += t->solvable(); 00482 assert(!isnan(total_likelihood)); 00483 } 00484 } else { 00485 FragHit& m = *frag.hits()[0]; 00486 Target* t = m.target(); 00487 t->lock(); 00488 locked_set.insert(t); 00489 total_likelihood = 0; 00490 m.params()->align_likelihood = 0; 00491 m.params()->full_likelihood = 0; 00492 foreach (const Target* neighbor, *frag.hits()[0]->neighbors()) { 00493 if (targ_set.count(neighbor) == 0) { 00494 neighbor->lock(); 00495 locked_set.insert(neighbor); 00496 } 00497 } 00498 } 00499 00500 if (islzero(total_likelihood)){ 00501 assert(expr_alpha_map); 00502 logger.warn("Fragment '%s' has 0 likelihood of originating from the " 00503 "transcriptome. Skipping...", frag.name().c_str()); 00504 foreach (const Target* t, locked_set) { 00505 t->unlock(); 00506 } 00507 return; 00508 } 00509 00510 if (first_round) { 00511 bundle->incr_counts(); 00512 } 00513 if (first_round || online_additional) { 00514 bundle->incr_mass(mass_n); 00515 } 00516 00517 // normalize marginal likelihoods 00518 for (size_t i = 0; i < frag.num_hits(); ++i) { 00519 FragHit& m = *frag[i]; 00520 Target* t = m.target(); 00521 00522 double p = m.params()->full_likelihood-total_likelihood; 00523 m.params()->posterior = p; 00524 if (targ_set.size() > 1) { 00525 double v = log_add(variances[i] - 2*total_mass, 00526 total_variance + 2*masses[i] - 4*total_mass); 00527 t->add_hit(m, v, mass_n); 00528 } else if (i == 0) { 00529 t->add_hit(m, LOG_0, mass_n); 00530 } 00531 00532 // update parameters 00533 if (first_round) { 00534 double r = rand()/double(RAND_MAX); 00535 00536 if (i == 0 || frag[i-1]->target_id() != t->id()) { 00537 t->incr_counts(targ_set.size() <= 1); 00538 } 00539 if (!t->solvable() && num_solvable == frag.num_hits()-1) { 00540 t->solvable(true); 00541 } 00542 if (edit_detect && lib.mismatch_table) { 00543 (lib.mismatch_table)->update(m, p, lib.mass_n); 00544 } 00545 if (!burned_out && r < sexp(p)) { 00546 if (lib.mismatch_table && !edit_detect) { 00547 (lib.mismatch_table)->update(m, LOG_1, lib.mass_n); 00548 } 00549 if (m.pair_status() == PAIRED) { 00550 (lib.fld)->add_val(m.length(), lib.mass_n); 00551 } 00552 if (lib.bias_table) { 00553 (lib.bias_table)->update_observed(m, lib.mass_n); 00554 } 00555 } 00556 } 00557 if (calc_covar && (last_round || online_additional)) { 00558 double var = 2*mass_n + p + log_sub(LOG_1, p); 00559 lib.targ_table->update_covar(m.target_id(), m.target_id(), var); 00560 for (size_t j = i+1; j < frag.num_hits(); ++j) { 00561 const FragHit& m2 = *frag.hits()[j]; 00562 double p2 = m2.params()->full_likelihood-total_likelihood; 00563 if (sexp(p2) == 0) { 00564 continue; 00565 } 00566 double covar = 2*mass_n + p + p2; 00567 lib.targ_table->update_covar(m.target_id(), m2.target_id(), covar); 00568 } 00569 } 00570 } 00571 00572 foreach (const Target* t, locked_set) { 00573 t->unlock(); 00574 } 00575 } 00576 00583 void proc_thread(ParseThreadSafety* pts) { 00584 while (true) { 00585 Fragment* frag = pts->proc_on.pop(); 00586 if (!frag) { 00587 break; 00588 } 00589 process_fragment(frag); 00590 pts->proc_out.push(frag); 00591 } 00592 } 00593 00603 size_t threaded_calc_abundances(Librarian& libs) { 00604 logger.info("Processing input fragment alignments..."); 00605 boost::scoped_ptr<boost::thread> bias_update; 00606 00607 size_t n = 1; 00608 size_t num_frags = 0; 00609 double mass_n = 0; 00610 00611 // For log-scale output 00612 size_t i = 1; 00613 size_t j = 6; 00614 00615 DirectionDetector dir_detector; 00616 Fragment* frag; 00617 00618 while (true) { 00619 // Loop through libraries 00620 for (size_t l = 0; l < libs.size(); l++) { 00621 Library& lib = libs[l]; 00622 libs.set_curr(l); 00623 MapParser& map_parser = *lib.map_parser; 00624 boost::mutex bu_mut; 00625 // Used to signal bias update thread 00626 running = true; 00627 ParseThreadSafety pts(max((int)num_threads,10)); 00628 boost::thread parse(&MapParser::threaded_parse, &map_parser, &pts, 00629 stop_at, num_neighbors); 00630 vector<boost::thread*> thread_pool; 00631 RobertsFilter frags_seen; 00632 00633 burned_out = lib.n >= burn_out; 00634 while(true) { 00635 if (lib.n == burn_in) { 00636 bias_update.reset(new boost::thread(&TargetTable::asynch_bias_update, 00637 lib.targ_table, &bu_mut)); 00638 if (lib.mismatch_table) { 00639 (lib.mismatch_table)->activate(); 00640 } 00641 } 00642 if (lib.n == burn_out) { 00643 if (lib.mismatch_table) { 00644 (lib.mismatch_table)->fix(); 00645 }; 00646 burned_out = true; 00647 } 00648 // Start threads once aux parameters are burned out 00649 if (burned_out && num_threads && thread_pool.size() == 0) { 00650 lib.targ_table->enable_bundle_threadsafety(); 00651 thread_pool = vector<boost::thread*>(num_threads); 00652 for (size_t k = 0; k < thread_pool.size(); k++) { 00653 thread_pool[k] = new boost::thread(proc_thread, &pts); 00654 } 00655 } 00656 00657 // Pop next parsed fragment and set mass 00658 frag = pts.proc_in.pop(); 00659 if (frag) { 00660 frag->mass(mass_n); 00661 dir_detector.add_fragment(frag); 00662 } 00663 00664 // Test that we have not already seen this fragment 00665 if (frag && first_round && frags_seen.test_and_push(frag->name())) { 00666 logger.severe("Alignments are not properly sorted. Read '%s' has " 00667 "alignments which are non-consecutive.", 00668 frag->name().c_str()); 00669 } 00670 00671 // If multi-threaded and burned out, push to the processing queue 00672 if (num_threads && burned_out) { 00673 // If no more fragments, send stop signal (NULL) to processing threads 00674 if (!frag) { 00675 for (size_t k = 0; k < thread_pool.size(); ++k) { 00676 pts.proc_on.push(NULL); 00677 } 00678 break; 00679 } 00680 pts.proc_on.push(frag); 00681 } else { 00682 if (!frag) { 00683 break; 00684 } 00685 { 00686 // Block the bias update thread from updating the paramater tables 00687 // during processing. We don't need to do this during multi-threaded 00688 // processing since the parameters are burned out before we start 00689 // the threads. 00690 boost::unique_lock<boost::mutex> lock(bu_mut); 00691 process_fragment(frag); 00692 pts.proc_out.push(frag); 00693 } 00694 } 00695 00696 // Output intermediate results, if necessary 00697 if (output_running_reads && n == i*pow(10.,(double)j)) { 00698 boost::unique_lock<boost::mutex> lock(bu_mut); 00699 output_results(libs, n, (int)n); 00700 if (i++ == 9) { 00701 i = 1; 00702 j++; 00703 } 00704 } 00705 num_frags++; 00706 00707 // Output progress 00708 if (num_frags % 1000000 == 0) { 00709 logger.info("Fragments Processed (%s): %d\tNumber of Bundles: %d.", 00710 lib.in_file_name.c_str(), num_frags, 00711 lib.targ_table->num_bundles()); 00712 dir_detector.report_if_improper_direction(); 00713 } 00714 00715 n++; 00716 lib.n++; 00717 mass_n += ff_param*log((double)n-1) - log(pow(n,ff_param) - 1); 00718 lib.mass_n += ff_param*log((double)lib.n-1) - 00719 log(pow(lib.n,ff_param) - 1); 00720 } 00721 00722 // Signal bias update thread to stop 00723 running = false; 00724 00725 parse.join(); 00726 foreach(boost::thread* t, thread_pool) { 00727 t->join(); 00728 } 00729 00730 lib.targ_table->disable_bundle_threadsafety(); 00731 lib.targ_table->collapse_bundles(); 00732 00733 if (bias_update) { 00734 logger.info("Waiting for auxiliary parameter update to complete..."); 00735 bias_update->join(); 00736 bias_update.reset(NULL); 00737 } 00738 } 00739 00740 if (online_additional && remaining_rounds--) { 00741 if (output_running_rounds) { 00742 output_results(libs, n, (int)remaining_rounds); 00743 } 00744 00745 logger.info("%d remaining rounds.", remaining_rounds); 00746 first_round = false; 00747 last_round = (remaining_rounds==0 && !both); 00748 for (size_t l = 0; l < libs.size(); l++) { 00749 libs[l].map_parser->write_active(last_round); 00750 libs[l].map_parser->reset_reader(); 00751 } 00752 num_frags = 0; 00753 } else { 00754 break; 00755 } 00756 } 00757 00758 logger.info("COMPLETED: Processed %d mapped fragments, targets are in %d " 00759 "bundles.", num_frags, libs[0].targ_table->num_bundles()); 00760 00761 return num_frags; 00762 } 00763 00769 int estimation_main() { 00770 00771 if (output_dir != ".") { 00772 try { 00773 fs::create_directories(output_dir); 00774 } catch (fs::filesystem_error& e) { 00775 logger.info(e.what()); 00776 } 00777 } 00778 00779 if (!fs::exists(output_dir)) { 00780 logger.severe("Cannot create directory %s.", output_dir.c_str()); 00781 } 00782 00783 // Parse input file names and instantiate Libray structs. 00784 vector<string> file_names; 00785 char buff[999]; 00786 strcpy(buff, in_map_file_names.c_str()); 00787 char * pch = strtok (buff,","); 00788 while (pch != NULL) { 00789 file_names.push_back(pch); 00790 pch = strtok (NULL, ","); 00791 } 00792 if (file_names.size() == 0) { 00793 file_names.push_back(""); 00794 } 00795 Librarian libs(file_names.size()); 00796 for (size_t i = 0; i < file_names.size(); ++i) { 00797 char out_map_file_name[500] = ""; 00798 if (output_align_prob) { 00799 sprintf(out_map_file_name, "%s/hits.%d.prob", 00800 output_dir.c_str(), (int)i+1); 00801 } 00802 if (output_align_samp) { 00803 sprintf(out_map_file_name, "%s/hits.%d.samp", 00804 output_dir.c_str(), (int)i+1); 00805 } 00806 00807 libs[i].in_file_name = file_names[i]; 00808 libs[i].out_file_name = out_map_file_name; 00809 libs[i].map_parser.reset(new MapParser(&libs[i], last_round)); 00810 00811 if (param_file_name.size()) { 00812 libs[i].fld.reset(new LengthDistribution(param_file_name, "Fragment")); 00813 libs[i].mismatch_table.reset((error_model) ? 00814 new MismatchTable(param_file_name) 00815 : NULL); 00816 libs[i].bias_table.reset((bias_correct) ? new BiasBoss(bias_model_order, 00817 param_file_name):NULL); 00818 } else { 00819 libs[i].fld.reset(new LengthDistribution(fld_alpha, def_fl_max, 00820 def_fl_mean, def_fl_stddev, 00821 def_fl_kernel_n, 00822 def_fl_kernel_p)); 00823 libs[i].mismatch_table.reset((error_model) ? new MismatchTable(mm_alpha) 00824 :NULL); 00825 libs[i].bias_table.reset((bias_correct) ? new BiasBoss(bias_model_order, 00826 bias_alpha):NULL); 00827 } 00828 if (i > 0 && 00829 (libs[i].map_parser->targ_index() != libs[i-1].map_parser->targ_index() 00830 || libs[i].map_parser->targ_lengths() != 00831 libs[i-1].map_parser->targ_lengths())) { 00832 logger.severe("Alignment file headers do not match for '%s' and '%s'.", 00833 file_names[i-1].c_str(), file_names[i].c_str()); 00834 } 00835 } 00836 00837 boost::shared_ptr<TargetTable> targ_table( 00838 new TargetTable(fasta_file_name, 00839 haplotype_file_name, 00840 edit_detect, 00841 param_file_name.size(), 00842 expr_alpha, expr_alpha_map, 00843 &libs)); 00844 size_t max_target_length = 0; 00845 for(size_t tid=0; tid < targ_table->size(); tid++) { 00846 max_target_length = max(max_target_length, 00847 targ_table->get_targ(tid)->length()); 00848 } 00849 00850 for (size_t i = 0; i < libs.size(); ++i) { 00851 libs[i].targ_table = targ_table; 00852 if (bias_correct) { 00853 libs[i].bias_table->copy_expectations(*(libs.curr_lib().bias_table)); 00854 } 00855 } 00856 double num_targ = (double)targ_table->size(); 00857 00858 if (calc_covar && (double)SSIZE_MAX < num_targ*(num_targ+1)) { 00859 logger.warn("Your system is unable to represent large enough values for " 00860 "efficiently hashing target pairs. Covariance calculation will " 00861 "be disabled."); 00862 calc_covar = false; 00863 } 00864 00865 if (batch_mode) { 00866 targ_table->round_reset(); 00867 } 00868 00869 size_t tot_counts = threaded_calc_abundances(libs); 00870 if (library_size) { 00871 tot_counts = library_size; 00872 } 00873 00874 if (!burned_out && bias_correct && param_file_name == "") { 00875 logger.warn("Not enough fragments observed to accurately learn bias " 00876 "parameters. Either disable bias correction " 00877 "(--no-bias-correct) or provide a file containing auxiliary " 00878 "parameters (--aux-param-file)."); 00879 } 00880 00881 if (both) { 00882 remaining_rounds = 1; 00883 online_additional = false; 00884 } 00885 00886 if (remaining_rounds) { 00887 targ_table->masses_to_counts(); 00888 } 00889 00890 targ_table->round_reset(); 00891 ff_param = 1.0; 00892 00893 first_round = false; 00894 00895 while (!last_round) { 00896 if (output_running_rounds) { 00897 output_results(libs, tot_counts, (int)remaining_rounds); 00898 } 00899 remaining_rounds--; 00900 logger.info("\nRe-estimating counts with additional round of EM (%d " 00901 "remaining)...", remaining_rounds); 00902 last_round = (remaining_rounds == 0); 00903 for (size_t l = 0; l < libs.size(); l++) { 00904 libs[l].map_parser->write_active(last_round); 00905 libs[l].map_parser->reset_reader(); 00906 } 00907 tot_counts = threaded_calc_abundances(libs); 00908 if (library_size) { 00909 tot_counts = library_size; 00910 } 00911 targ_table->round_reset(); 00912 } 00913 00914 logger.info("Writing results to file..."); 00915 output_results(libs, tot_counts); 00916 logger.info("Done."); 00917 00918 return 0; 00919 } 00920 00921 #ifdef PROTO 00922 inline string base64_encode(const string& to_encode) { 00923 using namespace boost::archive::iterators; 00924 typedef base64_from_binary<transform_width<string::const_iterator,6,8> > it_base64_t; 00925 unsigned int writePaddChars = (3-to_encode.length()%3)%3; 00926 string base64(it_base64_t(to_encode.begin()), it_base64_t(to_encode.end())); 00927 base64.append(writePaddChars,'='); 00928 return base64; 00929 } 00930 00931 int preprocess_main() { 00932 try { 00933 fs::create_directories(output_dir); 00934 } catch (fs::filesystem_error& e) { 00935 logger.info(e.what()); 00936 } 00937 00938 if (!fs::exists(output_dir)) { 00939 logger.severe("Cannot create directory %s.", output_dir.c_str()); 00940 } 00941 00942 Librarian libs(1); 00943 Library& lib = libs[0]; 00944 lib.in_file_name = in_map_file_names; 00945 lib.out_file_name = ""; 00946 00947 lib.map_parser.reset(new MapParser (&lib, false)); 00948 lib.fld.reset(new LengthDistribution(0, 0, 0, 1, 2, 0)); 00949 MarkovModel bias_model(3, 21, 21, 0); 00950 MismatchTable mismatch_table(0); 00951 lib.targ_table.reset(new TargetTable(fasta_file_name, "", 0, 0, 0.0, NULL, 00952 &libs)); 00953 00954 logger.info("Converting targets to Protocol Buffers..."); 00955 fstream targ_out((output_dir + "/targets.pb").c_str(), 00956 ios::out | ios::trunc); 00957 string out_buff; 00958 proto::Target target_proto; 00959 for (TargID id = 0; id < lib.targ_table->size(); ++id) { 00960 target_proto.Clear(); 00961 Target& targ = *lib.targ_table->get_targ(id); 00962 target_proto.set_name(targ.name()); 00963 target_proto.set_id((unsigned int)targ.id()); 00964 target_proto.set_length((unsigned int)targ.length()); 00965 target_proto.set_seq(targ.seq(0).serialize()); 00966 00967 target_proto.SerializeToString(&out_buff); 00968 targ_out << base64_encode(out_buff) << endl; 00969 } 00970 targ_out.close(); 00971 00972 logger.info("Converting fragment alignments to Protocol Buffers..."); 00973 ostream frag_out(cout.rdbuf()); 00974 00975 size_t num_frags = 0; 00976 Fragment* frag; 00977 00978 ParseThreadSafety pts(10); 00979 boost::thread parse(&MapParser::threaded_parse, lib.map_parser.get(), &pts, 00980 stop_at, 0); 00981 RobertsFilter frags_seen; 00982 proto::Fragment frag_proto; 00983 while(true) { 00984 frag_proto.Clear(); 00985 00986 // Pop next parsed fragment and set mass 00987 frag = pts.proc_in.pop(); 00988 00989 if (!frag) { 00990 break; 00991 } 00992 00993 // Test that we have not already seen this fragment 00994 if (frags_seen.test_and_push(frag->name())) { 00995 logger.severe("Alignments are not properly sorted. Read '%s' has " 00996 "alignments which are non-consecutive.", 00997 frag->name().c_str()); 00998 } 00999 01000 frag_proto.set_paired(frag->paired()); 01001 01002 vector<char> ref_left_mm_indices; 01003 vector<char> ref_left_mm_seq; 01004 vector<char> ref_left_mm_ref; 01005 vector<char> ref_right_mm_indices; 01006 vector<char> ref_right_mm_seq; 01007 vector<char> ref_right_mm_ref; 01008 bool ref_left_first = false; 01009 01010 for (size_t i = 0; i < frag->num_hits(); ++i) { 01011 FragHit& fh = *(*frag)[i]; 01012 proto::FragmentAlignment& align_proto = *frag_proto.add_alignments(); 01013 align_proto.set_target_id((unsigned int)fh.target_id()); 01014 01015 vector<char> left_mm_indices; 01016 vector<char> left_mm_seq; 01017 vector<char> left_mm_ref; 01018 vector<char> right_mm_indices; 01019 vector<char> right_mm_seq; 01020 vector<char> right_mm_ref; 01021 01022 mismatch_table.get_indices(fh, left_mm_indices, left_mm_seq, left_mm_ref, 01023 right_mm_indices, right_mm_seq, right_mm_ref); 01024 01025 if (i == 0) { 01026 ref_left_mm_indices = left_mm_indices; 01027 ref_left_mm_seq = left_mm_seq; 01028 ref_left_mm_ref = left_mm_ref; 01029 ref_right_mm_indices = right_mm_indices; 01030 ref_right_mm_seq = right_mm_seq; 01031 ref_right_mm_ref = right_mm_ref; 01032 } 01033 01034 ReadHit* read_l = fh.left_read(); 01035 if (read_l) { 01036 if (i==0) { ref_left_first = read_l->first; } 01037 proto::ReadAlignment& read_proto = *align_proto.mutable_read_l(); 01038 read_proto.set_first(read_l->first); 01039 read_proto.set_left_pos(read_l->left); 01040 read_proto.set_right_pos(read_l->right-1); 01041 if (i == 0 || read_l->first != ref_left_first || 01042 left_mm_indices != ref_left_mm_indices || 01043 left_mm_seq != ref_left_mm_seq || 01044 left_mm_ref != ref_left_mm_ref) { 01045 read_proto.set_mismatch_indices(string(left_mm_indices.begin(), 01046 left_mm_indices.end())); 01047 read_proto.set_mismatch_nucs(string(left_mm_seq.begin(), 01048 left_mm_seq.end())); 01049 } 01050 } 01051 01052 ReadHit* read_r = fh.right_read(); 01053 if (read_r) { 01054 if (i==0) { ref_left_first = !read_r->first; } 01055 proto::ReadAlignment& read_proto = *align_proto.mutable_read_r(); 01056 read_proto.set_first(read_r->first); 01057 read_proto.set_left_pos(read_r->left); 01058 read_proto.set_right_pos(read_r->right-1); 01059 if (i == 0 || read_r->first == ref_left_first || 01060 right_mm_indices != ref_right_mm_indices || 01061 right_mm_seq != ref_right_mm_seq || 01062 right_mm_ref != ref_right_mm_ref) { 01063 read_proto.set_mismatch_indices(string(right_mm_indices.begin(), 01064 right_mm_indices.end())); 01065 read_proto.set_mismatch_nucs(string(right_mm_seq.begin(), 01066 right_mm_seq.end())); 01067 } 01068 } 01069 } 01070 frag_proto.SerializeToString(&out_buff); 01071 frag_out << base64_encode(out_buff) << endl; 01072 01073 pts.proc_out.push(frag); 01074 01075 num_frags++; 01076 01077 // Output progress 01078 if (num_frags % 1000000 == 0) { 01079 logger.info("Fragments Processed: %d", num_frags); 01080 } 01081 } 01082 01083 parse.join(); 01084 01085 return 0; 01086 } 01087 #endif 01088 01089 01090 int main (int argc, char ** argv) 01091 { 01092 01093 srand((unsigned int)time(NULL)); 01094 int parse_ret = parse_options(argc,argv); 01095 if (parse_ret) { 01096 return parse_ret; 01097 } 01098 01099 #ifdef PROTO 01100 if (spark_pre) { 01101 return preprocess_main(); 01102 } 01103 #endif 01104 01105 return estimation_main(); 01106 }