task.py 35.9 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2 3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9 10 11
import time
import sys
import datetime
12 13
import random
import xmlrpclib
14
import tools_utils
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31 32 33 34 35 36 37 38 39 40 41
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
42 43 44 45 46

            # Be robust against 'null' values for clones
            if not d["clones"]:
                d["clones"] = []

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
65
                       & ( db.results_file.hidden == False )
66 67 68 69 70 71 72 73 74 75 76
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
77
                print(defs.DIR_RESULTS+row.results_file.data_file)
78 79 80 81 82 83
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
84 85 86 87
                        # Be robust against 'null' values for clones
                        if not d["clones"]:
                            d["clones"] = []
                            
88 89 90 91 92 93 94 95 96
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
97 98
                    except ValueError as e:
                        print('invalid_json')
99 100 101 102 103
                    json_data2.close()


    return result
    
104
def compute_extra(id_file, id_config, min_threshold):
Ryan Herbert's avatar
Ryan Herbert committed
105
    result = {}
106
    d = None
Ryan Herbert's avatar
Ryan Herbert committed
107
    results_file = db((db.results_file.sequence_file_id == id_file) &
108
                      (db.results_file.hidden == False) &
Ryan Herbert's avatar
Ryan Herbert committed
109
                      (db.results_file.config_id == id_config)
110 111 112
                    ).select(orderby=~db.results_file.run_date).first()
    filename = defs.DIR_RESULTS+results_file.data_file
    with open(filename, "rb") as rf:
113
        try:
Ryan Herbert's avatar
Ryan Herbert committed
114
            d = json.load(rf)
115 116
            loci_min = {}

117 118 119 120 121 122
            if 'reads' in d and 'germline' in d['reads']:
                loci_totals = d['reads']['germline']
                for locus in loci_totals:
                    if locus not in result:
                        result[locus] = [0]
                    loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)
123

124 125 126 127 128
            if 'clones' in d and d['clones'] is not None:
                for clone in d["clones"]:
                    germline = clone['germline']
                    if clone['reads'][0] >=  loci_min[germline]:
                        result[germline][0] += 1
flothoni's avatar
flothoni committed
129 130
            elif d["clones"] is None:
                # Be robust against 'null' values for clones
131 132
                d["clones"] = []
            
Ryan Herbert's avatar
Ryan Herbert committed
133 134 135
        except ValueError as e:
            print('invalid_json')
            return "FAIL"
136 137 138
    d['reads']['distribution'] = result
    with open(filename, 'wb') as extra:
        json.dump(d, extra)
Ryan Herbert's avatar
Ryan Herbert committed
139
    return "SUCCESS"
140

141

142
def schedule_run(id_sequence, id_config, grep_reads=None):
143
    from subprocess import Popen, PIPE, STDOUT, os
144

145 146 147
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
148 149
             ).select()
    
Marc Duez's avatar
Marc Duez committed
150 151 152
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
153
                                     hidden = grep_reads is not None, 
Marc Duez's avatar
Marc Duez committed
154
                                     config_id = id_config )
155
        
156
    args = [id_sequence, id_config, data_id, grep_reads]
157

158 159 160 161
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
162

163
    program = db.config[id_config].program
164
    ##add task to scheduler
165 166
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
167
    
168
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
169 170
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
171
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
172

173
    filename= db.sequence_file[id_sequence].filename
174

175
    res = {"redirect": "reload",
176
           "processId": task.id,
177
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
178

179
    log.info(res)
180 181
    return res

182 183 184 185 186 187 188
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
189
                   & (db.results_file.hidden == False)
190 191 192 193 194
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
195 196 197
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
198

199 200 201 202
def schedule_compute_extra(id_file, id_config, min_threshold):
    args = [id_file, id_config,  min_threshold]
    task = scheduler.queue_task('compute_extra', args, repeats=1, timeout=defs.TASK_TIMEOUT)

203
def run_vidjil(id_file, id_config, id_data, grep_reads,
204
               clean_before=False, clean_after=False):
205
    from subprocess import Popen, PIPE, STDOUT, os
206 207
    from datetime import timedelta as timed
    
208 209 210 211 212
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
        print("Pre-process has failed")
        raise ValueError('pre-process has failed')
        return "FAIL"
    
213
    ## re schedule if pre_process is still pending
214
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
215
        
Mathieu Giraud's avatar
Mathieu Giraud committed
216
        print("Pre-process is still pending, re-schedule")
217
    
218
        args = [id_file, id_config, id_data, grep_reads]
219 220
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
221
                               start_time=request.now + timed(seconds=1200))
222 223
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
224
        print(task.id)
225 226 227 228
        sys.stdout.flush()
        
        return "SUCCESS"
    
229
    ## les chemins d'acces a vidjil / aux fichiers de sequences
230 231
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
232
    
233 234 235 236
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
237 238 239
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
240
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
241
    seq_file = upload_folder+filename
242

243
    ## config de vidjil
244
    vidjil_cmd = db.config[id_config].command
245 246 247 248 249 250 251

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
252
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
253 254

    if grep_reads:
255 256
        if re.match(r"^[acgtnACGTN]+$", grep_reads):
            vidjil_cmd += ' --out-clone-files --grep-reads "%s" ' % grep_reads
257
    
258
    os.makedirs(out_folder)
259 260
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
261

262
    try:
263
        ## commande complete
264
        cmd += ' -o  ' + out_folder + " -b " + output_filename
265 266 267
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
268 269 270
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
271
        sys.stdout.flush()
272

273
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
274

275
        (stdoutdata, stderrdata) = p.communicate()
276

Mathieu Giraud's avatar
Mathieu Giraud committed
277
        print("Output log in " + out_log)
278 279
        sys.stdout.flush()
        db.commit()
280

281 282 283 284 285
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
286

Mathieu Giraud's avatar
Mathieu Giraud committed
287
        print("===>", out_results)
288
        results_filepath = os.path.abspath(out_results)
289 290

        stream = open(results_filepath, 'rb')
291
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
292
        print("!!! Vidjil failed, no result file")
293 294
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
295
        raise
296
    
297 298 299 300 301 302 303 304 305 306 307 308 309
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
310
            print(l, end=' ')
311 312 313 314 315 316
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
317
            print(l, end=' ')
318 319 320 321 322 323
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


324
    ## insertion dans la base de donnée
325 326
    ts = time.time()
    
327
    db.results_file[id_data] = dict(status = "ready",
328
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
329 330
                                 data_file = stream
                                )
331
    
332 333
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
334 335 336 337
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
338
    
Mathieu Giraud's avatar
Mathieu Giraud committed
339 340
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
341

342 343
    config_name = db.config[id_config].name

344
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
345 346
    log.info(res)

347 348

    if not grep_reads:
349 350
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
351
	    print(row.sample_set_id)
352
            compute_extra(id_file, id_config, 5)
353
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
354

355
    os.remove(results_filepath)
Mathieu Giraud's avatar
Mathieu Giraud committed
356
    return "SUCCESS"
357

Alexander Shlemov's avatar
Alexander Shlemov committed
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

449
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
450 451
    from subprocess import Popen, PIPE, STDOUT, os
    import time
452
    import json
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
472
    report = out_folder + output_filename + '.mixcr.report'
473 474 475 476
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
477 478
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
479

480
    align_report = report + '.aln'
481
    assembly_report = report + '.asmbl'
482

483 484 485 486 487
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
488 489
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
490

491
    ## commande complete
492
    try:
493 494 495 496 497 498 499 500 501
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
502 503 504
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
505
        sys.stdout.flush()
506

507 508
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
509

Mathieu Giraud's avatar
Mathieu Giraud committed
510
        print("Output log in " + out_log)
511
        sys.stdout.flush()
512

513
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
514
        print("===>", out_results)
515
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
516
        stream = open(results_filepath, 'rb')
517
        stream.close()
518
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
519
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
520 521
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
522
        raise
523

HERBERT Ryan's avatar
HERBERT Ryan committed
524 525 526
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
527
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
528
    totalReads = extract_total_reads(assembly_report)
529 530
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
531 532
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
533
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
534
        fill_field(my_json, totalReads, "total", "reads")
535

536 537 538 539
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
540

541 542
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
543
    
544
    stream = open(results_filepath, 'rb')
545 546
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
547
                                 data_file = stream
548
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
549
    
550 551 552 553 554 555
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
556 557
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
558
        print(row.sample_set_id)
559
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
560

561
    os.remove(results_filepath)
HERBERT Ryan's avatar
HERBERT Ryan committed
562

563
    return "SUCCESS"
564

565
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
566 567 568 569
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
570
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
571 572 573 574 575 576 577 578 579 580 581 582
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
583

Mathieu Giraud's avatar
Mathieu Giraud committed
584
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
585 586 587 588 589
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
590

591 592 593
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
594
        print("!!! 'copy' failed, no file")
595 596
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
597 598 599 600 601 602 603 604
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
605
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
606 607 608 609 610 611 612 613 614 615 616
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

617
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
618 619
    log.info(res)

620 621
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
622
        print(row.sample_set_id)
623
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
624

625 626

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
627 628


629 630 631 632
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
633

634
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
635 636
    from subprocess import Popen, PIPE, STDOUT, os
    
637
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
638
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
639
    
Mathieu Giraud's avatar
Mathieu Giraud committed
640 641 642 643 644
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
645 646
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
647
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
648
    
649
    ## fuse.py 
650
    output_file = out_folder+'/'+output_filename+'.fused'
651
    files = ""
652
    sequence_file_list = ""
653
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
654
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
655
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
656
                   & ( db.results_file.config_id == id_config )
657
                   & ( db.results_file.hidden == False)
Vidjil Team's avatar
Vidjil Team committed
658
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
659 660 661 662 663 664 665
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
666
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
667
        if row.results_file.data_file is not None :
668
            res_file = defs.DIR_RESULTS + row.results_file.data_file
669 670
            if row.sequence_file.pre_process_file:
                pre_file = "%s/%s" % (defs.DIR_RESULTS, row.sequence_file.pre_process_file)
671 672 673 674
                files += "%s,%s" % (res_file, pre_file)
            else:
                files += res_file
            files += " "
675 676
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
677 678 679 680 681 682
    if files == "":
        print("!!! Fuse failed: no files to fuse")
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s no files to fuse" % (id_data, id_config, output_file)}
        log.error(res)
        return "FAILED"

683
    try:
684 685
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
686 687


Mathieu Giraud's avatar
Mathieu Giraud committed
688 689 690
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
691
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
692

693 694
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
695
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
696

697
        fuse_filepath = os.path.abspath(output_file)
698 699

        stream = open(fuse_filepath, 'rb')
700
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
701
        print("!!! Fuse failed, no .fused file")
702 703
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
704
        raise
705

706
    ts = time.time()
707 708 709 710

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
711
    existing_fused_file = None
712
    if len(fused_files) > 0:
713 714 715
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
716 717 718 719
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

720
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
721 722
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
723
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
724 725 726 727 728

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
729 730 731 732 733
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
734
    
735
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
736 737
    log.info(res)

738 739 740
    # Remove temporary fused file
    os.remove(output_file)

741
    return "SUCCESS"
742

743
def custom_fuse(file_list):
744
    from subprocess import Popen, PIPE, STDOUT, os
745 746 747

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
748
    random_id = random.randint(99999999,99999999999)
749
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
750
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
751 752 753 754 755
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
756

757
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
758 759
    log.info(res)
        
760 761 762
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
763 764 765 766 767
    for id in file_list :
        if db.results_file[id].data_file is not None :
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file)
            seq_file = db.sequence_file[db.results_file[id].sequence_file_id]
            if seq_file.pre_process_file is not None:
768
                files += ",%s" % os.path.abspath(defs.DIR_RESULTS + seq_file.pre_process_file)
769
            files += " "
770
    
771
    try:
772
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
773
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
774
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
775 776 777
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
778
    except:
779 780
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
781
        raise
782 783 784 785 786

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

787 788 789
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

790
    return data
791

HERBERT Ryan's avatar
HERBERT Ryan committed
792
#TODO move this ?
793 794 795 796 797 798
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
799 800 801 802 803 804 805 806 807 808 809 810 811
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
812 813 814 815
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
816 817 818 819 820 821 822 823 824 825 826 827 828 829

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
830 831
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
832
    res = {"redirect": "reload",
833
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
834 835 836 837 838

    log.info(res)
    return res


839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


856
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
857 858 859 860 861
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

862 863
    from subprocess import Popen, PIPE, STDOUT, os
    
864
    sequence_file = db.sequence_file[sequence_file_id]
865 866 867
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
868
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
869 870
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))