task.py 34.1 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2
3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9
10
11
import time
import sys
import datetime
12
13
import random
import xmlrpclib
14
import tools_utils
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87
88
                    except ValueError as e:
                        print('invalid_json')
89
90
91
92
93
                    json_data2.close()


    return result
    
94
def compute_extra(id_file, id_config, min_threshold):
Ryan Herbert's avatar
Ryan Herbert committed
95
    result = {}
96
    d = None
Ryan Herbert's avatar
Ryan Herbert committed
97
98
    results_file = db((db.results_file.sequence_file_id == id_file) &
                      (db.results_file.config_id == id_config)
99
100
101
                    ).select(orderby=~db.results_file.run_date).first()
    filename = defs.DIR_RESULTS+results_file.data_file
    with open(filename, "rb") as rf:
102
        try:
Ryan Herbert's avatar
Ryan Herbert committed
103
            d = json.load(rf)
104
            loci_min = {}
105
106
107
108
109
110
            if 'reads' in d and 'germline' in d['reads']:
                loci_totals = d['reads']['germline']
                for locus in loci_totals:
                    if locus not in result:
                        result[locus] = [0]
                    loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)
111

112
113
114
115
116
            if 'clones' in d and d['clones'] is not None:
                for clone in d["clones"]:
                    germline = clone['germline']
                    if clone['reads'][0] >=  loci_min[germline]:
                        result[germline][0] += 1
Ryan Herbert's avatar
Ryan Herbert committed
117
118
119
        except ValueError as e:
            print('invalid_json')
            return "FAIL"
120
121
122
    d['reads']['distribution'] = result
    with open(filename, 'wb') as extra:
        json.dump(d, extra)
Ryan Herbert's avatar
Ryan Herbert committed
123
    return "SUCCESS"
124

125

126
def schedule_run(id_sequence, id_config, grep_reads=None):
127
    from subprocess import Popen, PIPE, STDOUT, os
128

129
130
131
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
132
133
             ).select()
    
Marc Duez's avatar
Marc Duez committed
134
135
136
137
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
138
        
139
    args = [id_sequence, id_config, data_id, grep_reads]
140

141
142
143
144
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
145

146
    program = db.config[id_config].program
147
    ##add task to scheduler
148
149
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
150
    
151
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
152
153
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
154
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
155

156
    filename= db.sequence_file[id_sequence].filename
157

158
    res = {"redirect": "reload",
159
           "processId": task.id,
160
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
161

162
    log.info(res)
163
164
    return res

165
166
167
168
169
170
171
172
173
174
175
176
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
177
178
179
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
180

181
182
183
184
def schedule_compute_extra(id_file, id_config, min_threshold):
    args = [id_file, id_config,  min_threshold]
    task = scheduler.queue_task('compute_extra', args, repeats=1, timeout=defs.TASK_TIMEOUT)

185
def run_vidjil(id_file, id_config, id_data, grep_reads,
186
               clean_before=False, clean_after=False):
187
    from subprocess import Popen, PIPE, STDOUT, os
188
189
    from datetime import timedelta as timed
    
190
191
192
193
194
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
        print("Pre-process has failed")
        raise ValueError('pre-process has failed')
        return "FAIL"
    
195
    ## re schedule if pre_process is still pending
196
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
197
        
Mathieu Giraud's avatar
Mathieu Giraud committed
198
        print("Pre-process is still pending, re-schedule")
199
    
200
        args = [id_file, id_config, id_data, grep_reads]
201
202
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
203
                               start_time=request.now + timed(seconds=1200))
204
205
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
206
        print(task.id)
207
208
209
210
        sys.stdout.flush()
        
        return "SUCCESS"
    
211
    ## les chemins d'acces a vidjil / aux fichiers de sequences
212
213
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
214
    
215
216
217
218
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
219
220
221
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
222
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
223
    seq_file = upload_folder+filename
224

225
    ## config de vidjil
226
    vidjil_cmd = db.config[id_config].command
227
228
229
230
231
232
233

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
234
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
235
236
237

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
238
        vidjil_cmd += ' --grep-reads "%s" ' % grep_reads
239
    
240
    os.makedirs(out_folder)
241
242
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
243

244
    try:
245
        ## commande complete
246
        cmd += ' -o  ' + out_folder + " -b " + output_filename
247
248
249
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
250
251
252
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
253
        sys.stdout.flush()
254

255
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
256

257
        (stdoutdata, stderrdata) = p.communicate()
258

Mathieu Giraud's avatar
Mathieu Giraud committed
259
        print("Output log in " + out_log)
260
261
        sys.stdout.flush()
        db.commit()
262

263
264
265
266
267
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
268

Mathieu Giraud's avatar
Mathieu Giraud committed
269
        print("===>", out_results)
270
        results_filepath = os.path.abspath(out_results)
271
272

        stream = open(results_filepath, 'rb')
273
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
274
        print("!!! Vidjil failed, no result file")
275
276
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
277
        raise
278
    
279
280
281
282
283
284
285
286
287
288
289
290
291
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
292
            print(l, end=' ')
293
294
295
296
297
298
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
299
            print(l, end=' ')
300
301
302
303
304
305
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


306
    ## insertion dans la base de donnée
307
308
    ts = time.time()
    
309
    db.results_file[id_data] = dict(status = "ready",
310
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
311
312
                                 data_file = stream
                                )
313
    
314
315
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
316
317
318
319
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
320
    
Mathieu Giraud's avatar
Mathieu Giraud committed
321
322
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
323

324
325
    config_name = db.config[id_config].name

326
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
327
328
    log.info(res)

329
330

    if not grep_reads:
331
332
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
333
	    print(row.sample_set_id)
334
            compute_extra(id_file, id_config, 5)
335
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
336

Mathieu Giraud's avatar
Mathieu Giraud committed
337
    return "SUCCESS"
338

Alexander Shlemov's avatar
Alexander Shlemov committed
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

430
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
431
432
    from subprocess import Popen, PIPE, STDOUT, os
    import time
433
    import json
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
453
    report = out_folder + output_filename + '.mixcr.report'
454
455
456
457
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
458
459
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
460

461
    align_report = report + '.aln'
462
    assembly_report = report + '.asmbl'
463

464
465
466
467
468
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
469
470
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
471

472
    ## commande complete
473
    try:
474
475
476
477
478
479
480
481
482
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
483
484
485
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
486
        sys.stdout.flush()
487

488
489
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
490

Mathieu Giraud's avatar
Mathieu Giraud committed
491
        print("Output log in " + out_log)
492
        sys.stdout.flush()
493

494
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
495
        print("===>", out_results)
496
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
497
        stream = open(results_filepath, 'rb')
498
        stream.close()
499
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
500
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
501
502
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
503
        raise
504

HERBERT Ryan's avatar
HERBERT Ryan committed
505
506
507
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
508
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
509
    totalReads = extract_total_reads(assembly_report)
510
511
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
512
513
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
514
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
515
        fill_field(my_json, totalReads, "total", "reads")
516

517
518
519
520
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
521

522
523
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
524
    
525
    stream = open(results_filepath, 'rb')
526
527
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
528
                                 data_file = stream
529
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
530
    
531
532
533
534
535
536
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
537
538
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
539
        print(row.sample_set_id)
540
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
541
542


543
    return "SUCCESS"
544

545
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
546
547
548
549
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
550
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
551
552
553
554
555
556
557
558
559
560
561
562
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
563

Mathieu Giraud's avatar
Mathieu Giraud committed
564
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
565
566
567
568
569
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
570

571
572
573
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
574
        print("!!! 'copy' failed, no file")
575
576
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
577
578
579
580
581
582
583
584
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
585
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
586
587
588
589
590
591
592
593
594
595
596
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

597
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
598
599
    log.info(res)

600
601
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
602
        print(row.sample_set_id)
603
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
604

605
606

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
607
608


609
610
611
612
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
613

614
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
615
616
    from subprocess import Popen, PIPE, STDOUT, os
    
617
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
618
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
619
    
Mathieu Giraud's avatar
Mathieu Giraud committed
620
621
622
623
624
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
625
626
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
627
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
628
    
629
    ## fuse.py 
630
    output_file = out_folder+'/'+output_filename+'.fused'
631
    files = ""
632
    sequence_file_list = ""
633
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
634
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
635
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
636
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
637
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
638
639
640
641
642
643
644
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
645
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
646
647
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
648
649
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
650
651
652
653
654
655
    if files == "":
        print("!!! Fuse failed: no files to fuse")
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s no files to fuse" % (id_data, id_config, output_file)}
        log.error(res)
        return "FAILED"

656
    try:
657
658
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
659
660


Mathieu Giraud's avatar
Mathieu Giraud committed
661
662
663
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
664
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
665

666
667
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
668
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
669

670
        fuse_filepath = os.path.abspath(output_file)
671
672

        stream = open(fuse_filepath, 'rb')
673
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
674
        print("!!! Fuse failed, no .fused file")
675
676
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
677
        raise
678

679
    ts = time.time()
680
681
682
683

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
684
    existing_fused_file = None
685
    if len(fused_files) > 0:
686
687
688
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
689
690
691
692
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

693
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
694
695
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
696
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
697
698
699
700
701

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
702
703
704
705
706
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
707
    
708
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
709
710
    log.info(res)

711
712
713
    # Remove temporary fused file
    os.remove(output_file)

714
    return "SUCCESS"
715

716
def custom_fuse(file_list):
717
    from subprocess import Popen, PIPE, STDOUT, os
718
719
720

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
721
    random_id = random.randint(99999999,99999999999)
722
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
723
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
724
725
726
727
728
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
729

730
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
731
732
    log.info(res)
        
733
734
735
736
737
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
738
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
739
    
740
    try:
741
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
742
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
743
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
744
745
746
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
747
    except:
748
749
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
750
        raise
751
752
753
754
755

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

756
757
758
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

759
    return data
760

HERBERT Ryan's avatar
HERBERT Ryan committed
761
#TODO move this ?
762
763
764
765
766
767
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
768
769
770
771
772
773
774
775
776
777
778
779
780
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
781
782
783
784
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
785
786
787
788
789
790
791
792
793
794
795
796
797
798

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
799
800
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
801
    res = {"redirect": "reload",
802
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
803
804
805
806
807

    log.info(res)
    return res


808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


825
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
826
827
828
829
830
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

831
832
    from subprocess import Popen, PIPE, STDOUT, os
    
833
    sequence_file = db.sequence_file[sequence_file_id]
834
835
836
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
837
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
838
839
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
840
841
842
843
844
845
846
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

847
    output_file = out_folder+'/'+output_filename
848
849
            
    pre_process = db.pre_process[pre_process_id]
850

851
    try:
852
853
854
855
856
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)
857
858
859
860
861
        # Example of template to add some preprocess shortcut
        # cmd = cmd.replace("&preprocess_template&", defs.DIR_preprocess_template)
        # Where &preprocess_template& is the shortcut to change and
        # defs.DIR_preprocess_template the variable to set into the file defs.py. 
        # The value should be the path to access to the preprocess software.
862

Mathieu Giraud's avatar
Mathieu Giraud committed
863
864
865
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
866
        sys.stdout.flush()
867

868
869
870
871
872
873
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
874
        print("Output log in " + out_log)
875

876
        filepath = os.path.abspath(output_file)
877
878

        stream = open(filepath, 'rb')
879
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
880
        print("!!! Pre-process failed, no result file")
881
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
882
        log.error(res)
883
884
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
885
        raise
886

marc's avatar
marc committed
887
888
        

Mathieu Giraud's avatar
Mathieu Giraud committed
889
890
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
891
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
892
                                              data_file2 = None,
893
                                              pre_process_flag = "COMPLETED")
894
    db.commit()
895
896
897
898
899
900
901
902
903
904
905
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
906
907
    db.commit()

908
    # Dump log in scheduler_run.run_output
909
910
    log_file.close()
    for l in open(out_log):
Mathieu Giraud's avatar
Mathieu Giraud committed
911
        print(l, end=' ')
912
913
914

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
915
    
916
917
918
919
920
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
921
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
922
923
924
925
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
926
    
927
from gluon.scheduler import Scheduler
928
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
929
                               compute_contamination=compute_contamination,
930
                               compute_extra=compute_extra,
931
                               mixcr=run_mixcr,
Alexander Shlemov's avatar
Alexander Shlemov committed
932
                               igrec=run_igrec,
933
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
934
                               pre_process=run_pre_process,
935
                               refuse=run_refuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
936
                        heartbeat=defs.SCHEDULER_HEARTBEAT)