HPC Lab - Software - BiBench

Source code for bibench.algorithms.cpb

####################################################################
###     ____  _ ____                  _                          ###
###    | __ )(_) __ )  ___ _ __   ___| |__                       ###
###    |  _ \| |  _ \ / _ \ '_ \ / __| '_ \                      ###
###    | |_) | | |_) |  __/ | | | (__| | | |                     ###
###    |____/|_|____/ \___|_| |_|\___|_| |_|                     ###
###                                                              ###
###--------------------------------------------------------------###
###                                                              ###
### This file is part of the BiBench package for biclustering    ###
### analysis.                                                    ###
###                                                              ###
### Copyright (c) 2011 by:                                       ###
###   * Kemal Eren,                                              ###
###   * Mehmet Deveci,                                           ###
###   * Umit V. Catalyurek                                       ###
###                                                              ###
###--------------------------------------------------------------###
###                                                              ###
### For license info, please see the README and LICENSE files    ###
### in the main directory.                                       ###
###                                                              ###
###--------------------------------------------------------------###

"""
Correlated Pattern Bicluster (CPB) algorithm. Finds biclusters with genes
that have large pairwise Pearson correlation.
"""
import os
import subprocess

import bibench
from bibench.algorithms.wrapper import wrapper_helper
from bibench.bicluster import \
    Bicluster, BiclusterList, bicluster_algorithm, filter

BINARY = 'cpb'
INIT_BINARY = 'init_bicluster'


@bicluster_algorithm
[docs]def cpb(data, nclus, targetpcc=0.9, fixed_row=-1, fixed_col=-1, fixw=0, min_seed_rows=3, max_seed_rows=None): """ Wrapper for the CPB binary. Finds biclusters with high row-wise correlation. Args: * data: numpy.ndarray * nclus: Number of biclusters to find. * targetpcc: Minimum PCC for rows. * fixed_row: A row that must be in each bicluster; -1 means none. * fixed_col: A column that must be in each bicluster; -1 means none. * fixw: Weight for computing error of fixed rows. * min_seed_rows: Minimum number of rows in each seed bicluster. * max_seed_rows: Maximum number of rows in each seed bicluster. Returns: A list of biclusters. """ nrows, ncols = data.shape if max_seed_rows is None: max_seed_rows = nrows #check args assert nclus > 0 assert targetpcc <= 1 and targetpcc >= 0 assert (fixed_row >= 0 and fixed_rows < nrows) or fixed_row == -1 assert (fixed_col >= 0 and fixed_col < ncols) or fixed_col == -1 assert fixw <= 1 and fixw >= 0 assert min_seed_rows >= 1 assert max_seed_rows >= min_seed_rows and max_seed_rows <= nrows kwargs = locals() biclusters = wrapper_helper(BINARY, write_dataset=_write_dataset_, read_results=_read_results_, do_call=_do_call_, **kwargs) return biclusters
[docs]def cpb_filter(biclusters, data, nclus, *args, **kwargs): """ Filter out small biclusters found by chance. 'nclus' should be large enough to generate a representative sample set. Args: * biclusters: a list of biclusters found by CPB. * data: the dataset they were all run on. * nclus: the number of clusters to generate for filtering. * args: any parameters, in order, that cpb() takes. * kwargs: may be any of the same named parameters as cpb() takes. For accurate results, use the same parameters used to generate the biclusters to be filtered. Returns: A sublist of 'biclusters', containing only those biclusters that are not likely due to random chance. """ data = bibench.util.shuffle(data) kwargs['fixed_row'] = -1 kwargs['fixed_col'] = -1 shuffle_results = filter(cpb(data, nclus, *args, **kwargs)) if len(shuffle_results) == 0: return biclusters maxarea = max([b.area() for b in shuffle_results]) return [b for b in biclusters if b.area() > maxarea]
def _make_init_file_(initfile, init_binary, nrows, ncols, nclus, min_seed_rows, max_seed_rows, fixed_row, fixed_col, **kwargs): kwargs = locals() command = "{init_binary} {nrows} {ncols} {nclus} {min_seed_rows} " \ "{max_seed_rows} {fixed_row} {fixed_col} {initfile}".format(**kwargs) try: subprocess.check_call(command.split()) except OSError: raise Exception( "Error calling '{0}'. Is it on the PATH?" .format(kwargs['init_binary'])) def _do_call_(data, datafile, results_dir, **kwargs): datafile = os.path.abspath(datafile) results_dir = os.path.abspath(results_dir) #initial bicluster file directory = os.path.split(results_dir)[0] kwargs['initfile'] = os.path.join(directory, 'initfile.txt') kwargs['init_binary'] = INIT_BINARY _make_init_file_(**kwargs) #change to the results directory saved_path = os.getcwd() try: os.chdir(results_dir) command = '{0} {1} {initfile} 1 {targetpcc} {fixw}'.format(BINARY, datafile, **kwargs) subprocess.check_call(command.split()) except OSError: raise Exception("Error calling 'cpb'. Is it on the PATH?") finally: os.chdir(saved_path) def _write_dataset_(data, filename): """Writes a dataset to the format that CPB reads: [num rows] [num columns] [first row's values] [second row's values] ... [last row's values] """ outfile = file(filename, 'w') rows, cols = data.shape outfile.write("{0} {1}".format(rows, cols)) for row in data: outfile.write("\n") outfile.write(" ".join([str(j) for j in row])) outfile.write("\n") outfile.close() def _read_results_(results_dir, data): #find the results file with extension .out files = os.listdir(results_dir) outfiles = [f for f in files if os.path.splitext(f)[1] == '.out'] biclusters = [] for f in outfiles: biclusters.append(_read_result_file_(os.path.join(results_dir, f), data)) return biclusters def _read_result_file_(filename, data): """ Reads the bicluster in a single CPB output file. The file format is: ROWS [row index] [row score] [row index] [row score] ... [row index] [row score] COLS [col index] [col score] [col index] [col score] ... [col index] [col score] """ rows, cols = [], [] with open (filename, 'r') as f: target = rows for line in f: if line[0] == 'R': continue elif line[0] == 'C': target = cols continue else: target.append(int(line.split()[0])) rows.sort() cols.sort() return Bicluster(rows, cols, data=data)