/* Copyright 2009-2012 Andreas Biegert, Christof Angermueller This file is part of the CS-BLAST package. The CS-BLAST package is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. The CS-BLAST package is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "cstranslate_app.h" #include #include extern "C" { #include } namespace cs { template class CSTranslateMpiApp : public CSTranslateApp { public: void static payload(void* env, const size_t start, const size_t end) { CSTranslateMpiApp* app = (CSTranslateMpiApp*)env; app->Payload(start, end); } virtual int Run() { std::string input_data_file = this->opts_.infile + ".ffdata"; std::string input_index_file = this->opts_.infile + ".ffindex"; const bool isCa3m = this->opts_.informat == "ca3m"; if (isCa3m) { // infile has to be the ffindex basepath with no suffices input_data_file = this->opts_.infile + "_ca3m.ffdata"; input_index_file = this->opts_.infile + "_ca3m.ffindex"; } FFindexDatabase input(const_cast(input_data_file.c_str()), const_cast(input_index_file.c_str()), isCa3m); input.ensureLinearAccess(); //prepare output ffindex cs219 database std::string data_filename_out = this->opts_.outfile + ".ffdata"; std::string index_filename_out = this->opts_.outfile + ".ffindex"; int mpq_status = MPQ_Init(this->argc_, (const char**) this->argv_, input.db_index->n_entries); if (mpq_status == MPQ_SUCCESS) { if (MPQ_rank == MPQ_MASTER) { MPQ_Master(1); } else { this->SetupEmissions(); this->SetupPseudocountEngine(); this->SetupAbstractStateEngine(); this->input_index = input.db_index; this->input_data = input.db_data; FFindexDatabase *header_db = NULL; FFindexDatabase *sequence_db = NULL; if (isCa3m) { std::string input_header_data_file = this->opts_.infile + "_header.ffdata"; std::string input_header_index_file = this->opts_.infile + "_header.ffindex"; header_db = new FFindexDatabase(const_cast(input_header_data_file.c_str()), const_cast(input_header_index_file.c_str()), false); std::string input_sequence_data_file = this->opts_.infile + "_sequence.ffdata"; std::string input_sequence_index_file = this->opts_.infile + "_sequence.ffindex"; sequence_db = new FFindexDatabase(const_cast(input_sequence_data_file.c_str()), const_cast(input_sequence_index_file.c_str()), false); } this->input_header_index = header_db ? header_db->db_index : NULL; this->input_header_data = header_db ? header_db->db_data : NULL; this->input_sequence_index = sequence_db ? sequence_db->db_index : NULL; this->input_sequence_data = sequence_db ? sequence_db->db_data : NULL; this->data_file_out = openWrite(data_filename_out.c_str()); this->index_file_out = openWrite(index_filename_out.c_str()); this->offset = 0; std::string log_filename_out = this->opts_.outfile + ".log"; this->log_file = openWrite(log_filename_out.c_str()); MPQ_Worker(payload, this); if (this->log_file) { int fd = fileno(this->log_file); fflush(this->log_file); fsync(fd); fclose(this->log_file); } if (this->index_file_out) { int fd = fileno(this->index_file_out); fflush(this->index_file_out); fsync(fd); fclose(this->index_file_out); } if (this->data_file_out) { int fd = fileno(this->data_file_out); fflush(this->data_file_out); fsync(fd); fclose(this->data_file_out); } if (isCa3m) { delete sequence_db; delete header_db; } } MPI_Barrier(MPI_COMM_WORLD); if (MPQ_rank == MPQ_MASTER) { ffmerge_splits(data_filename_out.c_str(), index_filename_out.c_str(), 1, MPQ_size - 1, true); } } else { if (mpq_status == MPQ_ERROR_NO_WORKERS) { fprintf(stderr, "MPQ_Init: Needs at least one worker process.\n"); exit(EXIT_FAILURE); } } MPI_Finalize(); return EXIT_SUCCESS; }; void Payload(const size_t start, const size_t end) { for (size_t entry_index = start; entry_index < end; entry_index++) { ffindex_entry_t *entry = ffindex_get_entry_by_index(this->input_index, entry_index); if (entry == NULL) { fprintf(this->log_file, "Could not open entry %zu from input ffindex!\n", entry_index); continue; } if (this->opts_.verbose) { fprintf(this->log_file, "Processing entry: %s\n", entry->name); } std::ostringstream a3m_buffer; std::string a3m_string; FILE *inf; if (this->opts_.informat == "ca3m") { char *entry_data = ffindex_get_data_by_entry(this->input_data, entry); compressed_a3m::extract_a3m(entry_data, entry->length, this->input_sequence_index, this->input_sequence_data, this->input_header_index, this->input_header_data, &a3m_buffer); a3m_string = a3m_buffer.str(); inf = fmemopen(static_cast(const_cast(a3m_string.c_str())), a3m_string.length(), "r"); } else { inf = ffindex_fopen_by_entry(this->input_data, entry); } if (inf == NULL) { fprintf(this->log_file, "Could not open input entry (%s)!\n", entry->name); continue; } string header; CountProfile profile; // input profile we want to translate try { this->ReadProfile(inf, header, profile); } catch (const Exception &e) { fprintf(this->log_file, "Could not read entry: %s, Message: %s\n", entry->name, e.what()); continue; } size_t profile_counts_length = profile.counts.length(); CountProfile as_profile(profile_counts_length); // output profile this->Translate(profile, as_profile); // Prepare abstract sequence in AS219 format Sequence as_seq(profile_counts_length); as_seq.set_header(header); this->BuildSequence(as_profile, profile_counts_length, as_seq); std::stringstream out_buffer; if (this->opts_.outformat == "seq") { this->WriteStateSequence(as_seq, out_buffer); } else { this->WriteStateProfile(as_profile, out_buffer); } std::string out_string = out_buffer.str(); ffindex_insert_memory(this->data_file_out, this->index_file_out, &(this->offset), const_cast(out_string.c_str()), out_string.size(), entry->name); // FIXME: we are leaking inf, but if we fclose we get weird crashes //fclose(inf); if (entry_index % 1000 == 0) { fflush(this->data_file_out); fflush(this->index_file_out); } } }; // Parses command line options. virtual void ParseOptions(GetOpt_pp &ops) { ops >> Option('i', "infile", this->opts_.infile, this->opts_.infile); ops >> Option('o', "outfile", this->opts_.outfile, this->opts_.outfile); ops >> Option('I', "informat", this->opts_.informat, this->opts_.informat); ops >> Option('O', "outformat", this->opts_.outformat, this->opts_.outformat); ops >> Option('M', "match-assign", this->opts_.match_assign, this->opts_.match_assign); ops >> Option('x', "pc-admix", this->opts_.pc_admix, this->opts_.pc_admix); ops >> Option('c', "pc-ali", this->opts_.pc_ali, this->opts_.pc_ali); ops >> Option('A', "alphabet", this->opts_.alphabetfile, this->opts_.alphabetfile); ops >> Option('D', "context-data", this->opts_.modelfile, this->opts_.modelfile); ops >> Option('w', "weight", this->opts_.weight_as, this->opts_.weight_as); ops >> Option('v', "verbose", this->opts_.verbose, this->opts_.verbose); this->opts_.Validate(); if (this->opts_.informat == "auto") this->opts_.informat = GetFileExt(this->opts_.infile); }; // Prints options summary to stream. virtual void PrintOptions() const { fprintf(this->out_, " %-30s %s\n", "-i, --infile ", "Input ffindex with alignments or sequences"); fprintf(this->out_, " %-30s %s\n", "-o, --outfile ", "Output ffindex for generated abstract state sequence"); fprintf(this->out_, " %-30s %s (def=%s)\n", "-I, --informat prf|seq|fas|...", "Input format: prf, seq, fas, a2m, a3m or ca3m", this->opts_.informat.c_str()); fprintf(this->out_, " %-30s %s (def=%s)\n", "-O, --outformat seq|prf", "Outformat: abstract state sequence or profile", this->opts_.outformat.c_str()); fprintf(this->out_, " %-30s %s\n", "-M, --match-assign [0:100]", "Make all FASTA columns with less than X% gaps match columns"); fprintf(this->out_, " %-30s %s\n", "", "(def: make columns with residue in first sequence match columns)"); fprintf(this->out_, " %-30s %s (def=off)\n", "-A, --alphabet ", "Abstract state alphabet consisting of exactly 219 states"); fprintf(this->out_, " %-30s %s (def=off)\n", "-D, --context-data ", "Add context-specific pseudocounts using given context-data"); fprintf(this->out_, " %-30s %s (def=%-.2f)\n", "-x, --pc-admix [0,1]", "Pseudocount admix for context-specific pseudocounts", this->opts_.pc_admix); fprintf(this->out_, " %-30s %s (def=%-.1f)\n", "-c, --pc-ali [0,inf[", "Constant in pseudocount calculation for alignments", this->opts_.pc_ali); fprintf(this->out_, " %-30s %s (def=%-.2f)\n", "-w, --weight [0,inf[", "Weight of abstract state column in emission calculation", this->opts_.weight_as); }; // Prints usage banner to stream. virtual void PrintUsage() const { fputs("Usage: cstranslate_mpi -i -o -A [options]\n", this->out_); }; private: FILE* openWrite(const char* path) { char out_rank[FILENAME_MAX]; snprintf(out_rank, FILENAME_MAX, "%s.%d", path, MPQ_rank); FILE* out = fopen(out_rank, "w+"); if (out == NULL) { fprintf(this->out_, "Could not open ffindex output file! (%s)!\n", out_rank); exit(1); } return out; }; ffindex_index_t *input_index; char *input_data; ffindex_index_t *input_sequence_index; char *input_sequence_data; ffindex_index_t *input_header_index; char *input_header_data; FILE *data_file_out; FILE *index_file_out; size_t offset; FILE *log_file; }; }