task.py 33.9 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2 3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9 10 11
import time
import sys
import datetime
12 13
import random
import xmlrpclib
14
import tools_utils
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87 88
                    except ValueError as e:
                        print('invalid_json')
89 90 91 92 93
                    json_data2.close()


    return result
    
94
def compute_extra(id_file, id_config, min_threshold):
Ryan Herbert's avatar
Ryan Herbert committed
95
    result = {}
96
    d = None
Ryan Herbert's avatar
Ryan Herbert committed
97 98
    results_file = db((db.results_file.sequence_file_id == id_file) &
                      (db.results_file.config_id == id_config)
99 100 101
                    ).select(orderby=~db.results_file.run_date).first()
    filename = defs.DIR_RESULTS+results_file.data_file
    with open(filename, "rb") as rf:
102
        try:
Ryan Herbert's avatar
Ryan Herbert committed
103
            d = json.load(rf)
104
            loci_min = {}
105 106 107 108 109 110
            if 'reads' in d and 'germline' in d['reads']:
                loci_totals = d['reads']['germline']
                for locus in loci_totals:
                    if locus not in result:
                        result[locus] = [0]
                    loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)
111

112 113 114 115 116
            if 'clones' in d and d['clones'] is not None:
                for clone in d["clones"]:
                    germline = clone['germline']
                    if clone['reads'][0] >=  loci_min[germline]:
                        result[germline][0] += 1
Ryan Herbert's avatar
Ryan Herbert committed
117 118 119
        except ValueError as e:
            print('invalid_json')
            return "FAIL"
120 121 122
    d['reads']['distribution'] = result
    with open(filename, 'wb') as extra:
        json.dump(d, extra)
Ryan Herbert's avatar
Ryan Herbert committed
123
    return "SUCCESS"
124

125

126
def schedule_run(id_sequence, id_config, grep_reads=None):
127
    from subprocess import Popen, PIPE, STDOUT, os
128

129 130 131
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
132 133
             ).select()
    
Marc Duez's avatar
Marc Duez committed
134 135 136 137
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
138
        
139
    args = [id_sequence, id_config, data_id, grep_reads]
140

141 142 143 144
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
145

146
    program = db.config[id_config].program
147
    ##add task to scheduler
148 149
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
150
    
151
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
152 153
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
154
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
155

156
    filename= db.sequence_file[id_sequence].filename
157

158
    res = {"redirect": "reload",
159
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
160

161
    log.info(res)
162 163
    return res

164 165 166 167 168 169 170 171 172 173 174 175
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
176 177 178
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
179

180 181 182 183
def schedule_compute_extra(id_file, id_config, min_threshold):
    args = [id_file, id_config,  min_threshold]
    task = scheduler.queue_task('compute_extra', args, repeats=1, timeout=defs.TASK_TIMEOUT)

184
def run_vidjil(id_file, id_config, id_data, grep_reads,
185
               clean_before=False, clean_after=False):
186
    from subprocess import Popen, PIPE, STDOUT, os
187 188
    from datetime import timedelta as timed
    
189 190 191 192 193
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
        print("Pre-process has failed")
        raise ValueError('pre-process has failed')
        return "FAIL"
    
194
    ## re schedule if pre_process is still pending
195
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
196
        
Mathieu Giraud's avatar
Mathieu Giraud committed
197
        print("Pre-process is still pending, re-schedule")
198
    
199
        args = [id_file, id_config, id_data, grep_reads]
200 201
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
202
                               start_time=request.now + timed(seconds=1200))
203 204
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
205
        print(task.id)
206 207 208 209
        sys.stdout.flush()
        
        return "SUCCESS"
    
210
    ## les chemins d'acces a vidjil / aux fichiers de sequences
211 212
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
213
    
214 215 216 217
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
218 219 220
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
221
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
222
    seq_file = upload_folder+filename
223

224
    ## config de vidjil
225
    vidjil_cmd = db.config[id_config].command
226 227 228 229 230 231 232

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
233
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
234 235 236 237

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
238
    
239
    os.makedirs(out_folder)
240 241
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
242

243
    try:
244
        ## commande complete
245
        cmd += ' -o  ' + out_folder + " -b " + output_filename
246 247 248
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
249 250 251
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
252
        sys.stdout.flush()
253

254
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
255

256
        (stdoutdata, stderrdata) = p.communicate()
257

Mathieu Giraud's avatar
Mathieu Giraud committed
258
        print("Output log in " + out_log)
259 260
        sys.stdout.flush()
        db.commit()
261

262 263 264 265 266
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
267

Mathieu Giraud's avatar
Mathieu Giraud committed
268
        print("===>", out_results)
269
        results_filepath = os.path.abspath(out_results)
270 271

        stream = open(results_filepath, 'rb')
272
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
273
        print("!!! Vidjil failed, no result file")
274 275
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
276
        raise
277
    
278 279 280 281 282 283 284 285 286 287 288 289 290
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
291
            print(l, end=' ')
292 293 294 295 296 297
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
298
            print(l, end=' ')
299 300 301 302 303 304
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


305
    ## insertion dans la base de donnée
306 307
    ts = time.time()
    
308
    db.results_file[id_data] = dict(status = "ready",
309
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
310 311
                                 data_file = stream
                                )
312
    
313 314
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
315 316 317 318
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
319
    
Mathieu Giraud's avatar
Mathieu Giraud committed
320 321
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
322

323 324
    config_name = db.config[id_config].name

325
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
326 327
    log.info(res)

328 329

    if not grep_reads:
330 331
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
332
	    print(row.sample_set_id)
333
            compute_extra(id_file, id_config, 5)
334
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
335

Mathieu Giraud's avatar
Mathieu Giraud committed
336
    return "SUCCESS"
337

Alexander Shlemov's avatar
Alexander Shlemov committed
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

429
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
430 431
    from subprocess import Popen, PIPE, STDOUT, os
    import time
432
    import json
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
452
    report = out_folder + output_filename + '.mixcr.report'
453 454 455 456
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
457 458
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
459

460
    align_report = report + '.aln'
461
    assembly_report = report + '.asmbl'
462

463 464 465 466 467
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
468 469
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
470

471
    ## commande complete
472
    try:
473 474 475 476 477 478 479 480 481
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
482 483 484
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
485
        sys.stdout.flush()
486

487 488
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
489

Mathieu Giraud's avatar
Mathieu Giraud committed
490
        print("Output log in " + out_log)
491
        sys.stdout.flush()
492

493
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
494
        print("===>", out_results)
495
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
496
        stream = open(results_filepath, 'rb')
497
        stream.close()
498
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
499
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
500 501
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
502
        raise
503

HERBERT Ryan's avatar
HERBERT Ryan committed
504 505 506
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
507
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
508
    totalReads = extract_total_reads(assembly_report)
509 510
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
511 512
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
513
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
514
        fill_field(my_json, totalReads, "total", "reads")
515

516 517 518 519
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
520

521 522
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
523
    
524
    stream = open(results_filepath, 'rb')
525 526
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
527
                                 data_file = stream
528
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
529
    
530 531 532 533 534 535
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
536 537
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
538
        print(row.sample_set_id)
539
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
540 541


542
    return "SUCCESS"
543

544
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
545 546 547 548
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
549
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
550 551 552 553 554 555 556 557 558 559 560 561
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
562

Mathieu Giraud's avatar
Mathieu Giraud committed
563
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
564 565 566 567 568
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
569

570 571 572
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
573
        print("!!! 'copy' failed, no file")
574 575
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
576 577 578 579 580 581 582 583
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
584
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
585 586 587 588 589 590 591 592 593 594 595
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

596
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
597 598
    log.info(res)

599 600
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
601
        print(row.sample_set_id)
602
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
603

604 605

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
606 607


608 609 610 611
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
612

613
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
614 615
    from subprocess import Popen, PIPE, STDOUT, os
    
616
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
617
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
618
    
Mathieu Giraud's avatar
Mathieu Giraud committed
619 620 621 622 623
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
624 625
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
626
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
627
    
628
    ## fuse.py 
629
    output_file = out_folder+'/'+output_filename+'.fused'
630
    files = ""
631
    sequence_file_list = ""
632
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
633
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
634
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
635
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
636
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
637 638 639 640 641 642 643
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
644
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
645 646
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
647 648
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
649
    try:
650 651
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
652 653


Mathieu Giraud's avatar
Mathieu Giraud committed
654 655 656
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
657
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
658

659 660
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
661
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
662

663
        fuse_filepath = os.path.abspath(output_file)
664 665

        stream = open(fuse_filepath, 'rb')
666
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
667
        print("!!! Fuse failed, no .fused file")
668 669
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
670
        raise
671

672
    ts = time.time()
673 674 675 676

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
677
    existing_fused_file = None
678
    if len(fused_files) > 0:
679 680 681
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
682 683 684 685
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

686
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
687 688
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
689
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
690 691 692 693 694

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
695 696 697 698 699
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
700
    
701
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
702 703
    log.info(res)

704 705 706
    # Remove temporary fused file
    os.remove(output_file)

707
    return "SUCCESS"
708

709
def custom_fuse(file_list):
710
    from subprocess import Popen, PIPE, STDOUT, os
711 712 713

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
714
    random_id = random.randint(99999999,99999999999)
715
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
716
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
717 718 719 720 721
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
722

723
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
724 725
    log.info(res)
        
726 727 728 729 730
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
731
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
732
    
733
    try:
734
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
735
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
736
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
737 738 739
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
740
    except:
741 742
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
743
        raise
744 745 746 747 748

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

749 750 751
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

752
    return data
753

HERBERT Ryan's avatar
HERBERT Ryan committed
754
#TODO move this ?
755 756 757 758 759 760
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
761 762 763 764 765 766 767 768 769 770 771 772 773
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
774 775 776 777
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
778 779 780 781 782 783 784 785 786 787 788 789 790 791

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
792 793
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
794
    res = {"redirect": "reload",
795
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
796 797 798 799 800

    log.info(res)
    return res


801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


818
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
819 820 821 822 823
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

824 825
    from subprocess import Popen, PIPE, STDOUT, os
    
826
    sequence_file = db.sequence_file[sequence_file_id]
827 828 829
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
830
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
831 832
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
833 834 835 836 837 838 839
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

840
    output_file = out_folder+'/'+output_filename
841 842
            
    pre_process = db.pre_process[pre_process_id]
843

844
    try:
845 846 847 848 849
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)
850 851 852 853 854
        # Example of template to add some preprocess shortcut
        # cmd = cmd.replace("&preprocess_template&", defs.DIR_preprocess_template)
        # Where &preprocess_template& is the shortcut to change and
        # defs.DIR_preprocess_template the variable to set into the file defs.py. 
        # The value should be the path to access to the preprocess software.
855

Mathieu Giraud's avatar
Mathieu Giraud committed
856 857 858
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
859
        sys.stdout.flush()
860

861 862 863 864 865 866
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
867
        print("Output log in " + out_log)
868

869
        filepath = os.path.abspath(output_file)
870 871

        stream = open(filepath, 'rb')
872
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
873
        print("!!! Pre-process failed, no result file")
874
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
875
        log.error(res)
876 877
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
878
        raise