task.py 33.7 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2
3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9
10
11
import time
import sys
import datetime
12
13
import random
import xmlrpclib
14
import tools_utils
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87
88
                    except ValueError as e:
                        print('invalid_json')
89
90
91
92
93
                    json_data2.close()


    return result
    
94
def compute_extra(id_file, id_config, min_threshold):
Ryan Herbert's avatar
Ryan Herbert committed
95
    result = {}
96
    d = None
Ryan Herbert's avatar
Ryan Herbert committed
97
98
    results_file = db((db.results_file.sequence_file_id == id_file) &
                      (db.results_file.config_id == id_config)
99
100
101
                    ).select(orderby=~db.results_file.run_date).first()
    filename = defs.DIR_RESULTS+results_file.data_file
    with open(filename, "rb") as rf:
102
        try:
Ryan Herbert's avatar
Ryan Herbert committed
103
            d = json.load(rf)
104
105
106
            loci_min = {}
            loci_totals = d['reads']['germline']
            for locus in loci_totals:
Ryan Herbert's avatar
Ryan Herbert committed
107
108
                if locus not in result:
                    result[locus] = [0]
109
110
111
                loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)

            for clone in d["clones"]:
Ryan Herbert's avatar
Ryan Herbert committed
112
                germline = clone['germline']
113
                if clone['reads'][0] >=  loci_min[germline]:
Ryan Herbert's avatar
Ryan Herbert committed
114
115
116
117
                    result[germline][0] += 1
        except ValueError as e:
            print('invalid_json')
            return "FAIL"
118
119
120
    d['reads']['distribution'] = result
    with open(filename, 'wb') as extra:
        json.dump(d, extra)
Ryan Herbert's avatar
Ryan Herbert committed
121
    return "SUCCESS"
122

123

124
def schedule_run(id_sequence, id_config, grep_reads=None):
125
    from subprocess import Popen, PIPE, STDOUT, os
126

127
128
129
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
130
131
             ).select()
    
Marc Duez's avatar
Marc Duez committed
132
133
134
135
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
136
        
137
    args = [id_sequence, id_config, data_id, grep_reads]
138

139
140
141
142
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
143

144
    program = db.config[id_config].program
145
    ##add task to scheduler
146
147
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
148
    
149
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
150
151
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
152
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
153

154
    filename= db.sequence_file[id_sequence].filename
155

156
    res = {"redirect": "reload",
157
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
158

159
    log.info(res)
160
161
    return res

162
163
164
165
166
167
168
169
170
171
172
173
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
174
175
176
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
177

178
179
180
181
def schedule_compute_extra(id_file, id_config, min_threshold):
    args = [id_file, id_config,  min_threshold]
    task = scheduler.queue_task('compute_extra', args, repeats=1, timeout=defs.TASK_TIMEOUT)

182
def run_vidjil(id_file, id_config, id_data, grep_reads,
183
               clean_before=False, clean_after=False):
184
    from subprocess import Popen, PIPE, STDOUT, os
185
186
187
    from datetime import timedelta as timed
    
    ## re schedule if pre_process is still pending
188
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
189
        
Mathieu Giraud's avatar
Mathieu Giraud committed
190
        print("Pre-process is still pending, re-schedule")
191
    
192
        args = [id_file, id_config, id_data, grep_reads]
193
194
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
195
                               start_time=request.now + timed(seconds=1200))
196
197
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
198
        print(task.id)
199
200
201
202
        sys.stdout.flush()
        
        return "SUCCESS"
    
marc's avatar
marc committed
203
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
Mathieu Giraud's avatar
Mathieu Giraud committed
204
        print("Pre-process has failed")
marc's avatar
marc committed
205
206
        raise ValueError('pre-process has failed')
        return "FAIL"
207
208
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
209
210
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
211
    
212
213
214
215
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
216
217
218
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
219
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
220
    seq_file = upload_folder+filename
221

222
    ## config de vidjil
223
    vidjil_cmd = db.config[id_config].command
224
225
226
227
228
229
230

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
231
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
232
233
234
235

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
236
    
237
    os.makedirs(out_folder)
238
239
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
240

241
    try:
242
        ## commande complete
243
        cmd += ' -o  ' + out_folder + " -b " + output_filename
244
245
246
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
247
248
249
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
250
        sys.stdout.flush()
251

252
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
253

254
        (stdoutdata, stderrdata) = p.communicate()
255

Mathieu Giraud's avatar
Mathieu Giraud committed
256
        print("Output log in " + out_log)
257
258
        sys.stdout.flush()
        db.commit()
259

260
261
262
263
264
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
265

Mathieu Giraud's avatar
Mathieu Giraud committed
266
        print("===>", out_results)
267
        results_filepath = os.path.abspath(out_results)
268
269

        stream = open(results_filepath, 'rb')
270
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
271
        print("!!! Vidjil failed, no result file")
272
273
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
274
        raise
275
    
276
277
278
279
280
281
282
283
284
285
286
287
288
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
289
            print(l, end=' ')
290
291
292
293
294
295
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
296
            print(l, end=' ')
297
298
299
300
301
302
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


303
    ## insertion dans la base de donnée
304
305
    ts = time.time()
    
306
    db.results_file[id_data] = dict(status = "ready",
307
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
308
309
                                 data_file = stream
                                )
310
    
311
312
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
313
314
315
316
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
317
    
Mathieu Giraud's avatar
Mathieu Giraud committed
318
319
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
320

321
322
    config_name = db.config[id_config].name

323
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
324
325
    log.info(res)

326
327

    if not grep_reads:
328
329
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
330
	    print(row.sample_set_id)
331
            compute_extra(id_file, id_config, 5)
332
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
333

Mathieu Giraud's avatar
Mathieu Giraud committed
334
    return "SUCCESS"
335

Alexander Shlemov's avatar
Alexander Shlemov committed
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

427
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
428
429
    from subprocess import Popen, PIPE, STDOUT, os
    import time
430
    import json
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
450
    report = out_folder + output_filename + '.mixcr.report'
451
452
453
454
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
455
456
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
457

458
    align_report = report + '.aln'
459
    assembly_report = report + '.asmbl'
460

461
462
463
464
465
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
466
467
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
468

469
    ## commande complete
470
    try:
471
472
473
474
475
476
477
478
479
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
480
481
482
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
483
        sys.stdout.flush()
484

485
486
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
487

Mathieu Giraud's avatar
Mathieu Giraud committed
488
        print("Output log in " + out_log)
489
        sys.stdout.flush()
490

491
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
492
        print("===>", out_results)
493
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
494
        stream = open(results_filepath, 'rb')
495
        stream.close()
496
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
497
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
498
499
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
500
        raise
501

HERBERT Ryan's avatar
HERBERT Ryan committed
502
503
504
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
505
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
506
    totalReads = extract_total_reads(assembly_report)
507
508
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
509
510
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
511
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
512
        fill_field(my_json, totalReads, "total", "reads")
513

514
515
516
517
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
518

519
520
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
521
    
522
    stream = open(results_filepath, 'rb')
523
524
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
525
                                 data_file = stream
526
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
527
    
528
529
530
531
532
533
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
534
535
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
536
        print(row.sample_set_id)
537
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
538
539


540
    return "SUCCESS"
541

542
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
543
544
545
546
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
547
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
548
549
550
551
552
553
554
555
556
557
558
559
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
560

Mathieu Giraud's avatar
Mathieu Giraud committed
561
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
562
563
564
565
566
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
567

568
569
570
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
571
        print("!!! 'copy' failed, no file")
572
573
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
574
575
576
577
578
579
580
581
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
582
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
583
584
585
586
587
588
589
590
591
592
593
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

594
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
595
596
    log.info(res)

597
598
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
599
        print(row.sample_set_id)
600
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
601

602
603

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
604
605


606
607
608
609
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
610

611
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
612
613
    from subprocess import Popen, PIPE, STDOUT, os
    
614
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
615
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
616
    
Mathieu Giraud's avatar
Mathieu Giraud committed
617
618
619
620
621
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
622
623
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
624
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
625
    
626
    ## fuse.py 
627
    output_file = out_folder+'/'+output_filename+'.fused'
628
    files = ""
629
    sequence_file_list = ""
630
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
631
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
632
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
633
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
634
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
635
636
637
638
639
640
641
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
642
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
643
644
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
645
646
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
647
    try:
648
649
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
650
651


Mathieu Giraud's avatar
Mathieu Giraud committed
652
653
654
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
655
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
656

657
658
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
659
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
660

661
        fuse_filepath = os.path.abspath(output_file)
662
663

        stream = open(fuse_filepath, 'rb')
664
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
665
        print("!!! Fuse failed, no .fused file")
666
667
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
668
        raise
669

670
    ts = time.time()
671
672
673
674

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
675
    existing_fused_file = None
676
    if len(fused_files) > 0:
677
678
679
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
680
681
682
683
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

684
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
685
686
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
687
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
688
689
690
691
692

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
693
694
695
696
697
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
698
    
699
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
700
701
    log.info(res)

702
703
704
    # Remove temporary fused file
    os.remove(output_file)

705
    return "SUCCESS"
706

707
def custom_fuse(file_list):
708
    from subprocess import Popen, PIPE, STDOUT, os
709
710
711

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
712
    random_id = random.randint(99999999,99999999999)
713
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
714
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
715
716
717
718
719
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
720

721
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
722
723
    log.info(res)
        
724
725
726
727
728
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
729
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
730
    
731
    try:
732
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
733
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
734
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
735
736
737
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
738
    except:
739
740
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
741
        raise
742
743
744
745
746

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

747
748
749
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

750
    return data
751

HERBERT Ryan's avatar
HERBERT Ryan committed
752
#TODO move this ?
753
754
755
756
757
758
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
759
760
761
762
763
764
765
766
767
768
769
770
771
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
772
773
774
775
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
776
777
778
779
780
781
782
783
784
785
786
787
788
789

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
790
791
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
792
    res = {"redirect": "reload",
793
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
794
795
796
797
798

    log.info(res)
    return res


799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


816
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
817
818
819
820
821
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

822
823
    from subprocess import Popen, PIPE, STDOUT, os
    
824
    sequence_file = db.sequence_file[sequence_file_id]
825
826
827
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
828
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
829
830
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
831
832
833
834
835
836
837
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

838
    output_file = out_folder+'/'+output_filename
839
840
            
    pre_process = db.pre_process[pre_process_id]
841

842
    try:
843
844
845
846
847
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)
848
849
850
851
852
        # Example of template to add some preprocess shortcut
        # cmd = cmd.replace("&preprocess_template&", defs.DIR_preprocess_template)
        # Where &preprocess_template& is the shortcut to change and
        # defs.DIR_preprocess_template the variable to set into the file defs.py. 
        # The value should be the path to access to the preprocess software.
853

Mathieu Giraud's avatar
Mathieu Giraud committed
854
855
856
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
857
        sys.stdout.flush()
858

859
860
861
862
863
864
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
865
        print("Output log in " + out_log)
866

867
        filepath = os.path.abspath(output_file)
868
869

        stream = open(filepath, 'rb')
870
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
871
        print("!!! Pre-process failed, no result file")
872
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
873
        log.error(res)
874
875
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
876
        raise
877

marc's avatar
marc committed
878
879
        

Mathieu Giraud's avatar
Mathieu Giraud committed
880
881
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
882
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
883
                                              data_file2 = None,
884
                                              pre_process_flag = "COMPLETED")
885
    db.commit()
886
887
888
889
890
891
892
893
894
895
896
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
897
898
    db.commit()

899
    # Dump log in scheduler_run.run_output
900
901
    log_file.close()
    for l in open(out_log):
Mathieu Giraud's avatar
Mathieu Giraud committed
902
        print(l, end=' ')
903
904
905

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
906
    
907
908
909
910
911
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
912
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
913
914
915
916
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
917
    
918
from gluon.scheduler import Scheduler
919
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
920
                               compute_contamination=compute_contamination,
921
                               compute_extra=compute_extra,
922
                               mixcr=run_mixcr,
Alexander Shlemov's avatar
Alexander Shlemov committed
923
                               igrec=run_igrec,
924
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
925
                               pre_process=run_pre_process,
926
                               refuse=run_refuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
927
                        heartbeat=defs.SCHEDULER_HEARTBEAT)