task.py 28.7 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2
3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9
10
11
import time
import sys
import datetime
12
13
import random
import xmlrpclib
14
import tools_utils
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87
88
                    except ValueError as e:
                        print('invalid_json')
89
90
91
92
93
                    json_data2.close()


    return result
    
94

95
def schedule_run(id_sequence, id_config, grep_reads=None):
96
    from subprocess import Popen, PIPE, STDOUT, os
97

98
99
100
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
101
102
             ).select()
    
Marc Duez's avatar
Marc Duez committed
103
104
105
106
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
107
        
108
    args = [id_sequence, id_config, data_id, grep_reads]
109

110
111
112
113
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
114

115
    program = db.config[id_config].program
116
    ##add task to scheduler
117
118
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
119
120
121
122
    
    if db.sequence_file[id_sequence].pre_process_flag == "WAIT" or db.sequence_file[id_sequence].pre_process_flag == "RUN" : 
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
123
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
124

125
    filename= db.sequence_file[id_sequence].filename
126

127
    res = {"redirect": "reload",
128
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
129

130
    log.info(res)
131
132
    return res

133
134
135
136
137
138
139
140
141
142
143
144
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
145
146
147
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
148

149
def run_vidjil(id_file, id_config, id_data, grep_reads,
150
               clean_before=False, clean_after=False):
151
    from subprocess import Popen, PIPE, STDOUT, os
152
153
154
    from datetime import timedelta as timed
    
    ## re schedule if pre_process is still pending
marc's avatar
marc committed
155
    if db.sequence_file[id_file].pre_process_flag == "WAIT" or db.sequence_file[id_file].pre_process_flag == "RUN" :
156
        
Mathieu Giraud's avatar
Mathieu Giraud committed
157
        print("Pre-process is still pending, re-schedule")
158
    
159
        args = [id_file, id_config, id_data, grep_reads]
160
161
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
162
                               start_time=request.now + timed(seconds=1200))
163
164
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
165
        print(task.id)
166
167
168
169
        sys.stdout.flush()
        
        return "SUCCESS"
    
marc's avatar
marc committed
170
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
Mathieu Giraud's avatar
Mathieu Giraud committed
171
        print("Pre-process has failed")
marc's avatar
marc committed
172
173
        raise ValueError('pre-process has failed')
        return "FAIL"
174
175
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
176
    germline_folder = defs.DIR_GERMLINE
177
178
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
179
    
180
181
182
183
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
184
185
186
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
187
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
188
    seq_file = upload_folder+filename
189

190
    ## config de vidjil
191
    vidjil_cmd = db.config[id_config].command
192
    vidjil_cmd = vidjil_cmd.replace( ' germline' ,germline_folder)
193
194
195
196

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
197
    
198
    os.makedirs(out_folder)
199
200
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
201

202
    try:
203
204
205
206
207
        ## commande complete
        cmd = defs.DIR_VIDJIL + '/vidjil ' + ' -o  ' + out_folder + " -b " + output_filename
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
208
209
210
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
211
        sys.stdout.flush()
212

213
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
214

215
        (stdoutdata, stderrdata) = p.communicate()
216

Mathieu Giraud's avatar
Mathieu Giraud committed
217
        print("Output log in " + out_log)
218
219
        sys.stdout.flush()
        db.commit()
220

221
222
223
224
225
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
226

Mathieu Giraud's avatar
Mathieu Giraud committed
227
        print("===>", out_results)
228
        results_filepath = os.path.abspath(out_results)
229
230

        stream = open(results_filepath, 'rb')
231
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
232
        print("!!! Vidjil failed, no result file")
233
234
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
235
        raise
236
    
237
238
239
240
241
242
243
244
245
246
247
248
249
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
250
            print(l, end=' ')
251
252
253
254
255
256
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
257
            print(l, end=' ')
258
259
260
261
262
263
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


264
    ## insertion dans la base de donnée
265
266
    ts = time.time()
    
267
    db.results_file[id_data] = dict(status = "ready",
268
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
269
270
                                 data_file = stream
                                )
271
    
272
273
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
274
275
276
277
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
278
    
Mathieu Giraud's avatar
Mathieu Giraud committed
279
280
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
281

282
283
    config_name = db.config[id_config].name

284
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
285
286
    log.info(res)

287
288

    if not grep_reads:
289
290
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
291
	    print(row.sample_set_id)
292
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
293

Mathieu Giraud's avatar
Mathieu Giraud committed
294
    return "SUCCESS"
295

296
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
297
298
    from subprocess import Popen, PIPE, STDOUT, os
    import time
299
    import json
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
319
    report = out_folder + output_filename + '.mixcr.report'
320
321
322
323
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
324
325
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
326

327
    align_report = report + '.aln'
328
    assembly_report = report + '.asmbl'
329

330
331
332
333
334
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
335
336
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
337

338
    ## commande complete
339
    try:
340
341
342
343
344
345
346
347
348
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
349
350
351
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
352
        sys.stdout.flush()
353

354
355
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
356

Mathieu Giraud's avatar
Mathieu Giraud committed
357
        print("Output log in " + out_log)
358
        sys.stdout.flush()
359

360
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
361
        print("===>", out_results)
362
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
363
        stream = open(results_filepath, 'rb')
364
        stream.close()
365
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
366
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
367
368
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
369
        raise
370

HERBERT Ryan's avatar
HERBERT Ryan committed
371
372
373
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
374
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
375
    totalReads = extract_total_reads(assembly_report)
376
377
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
378
379
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
380
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
381
        fill_field(my_json, totalReads, "total", "reads")
382

383
384
385
386
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
387

388
389
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
390
    
391
    stream = open(results_filepath, 'rb')
392
393
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
394
                                 data_file = stream
395
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
396
    
397
398
399
400
401
402
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
403
404
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
405
        print(row.sample_set_id)
406
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
407
408


409
    return "SUCCESS"
410

411
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
412
413
414
415
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
416
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
417
418
419
420
421
422
423
424
425
426
427
428
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
429

Mathieu Giraud's avatar
Mathieu Giraud committed
430
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
431
432
433
434
435
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
436

437
438
439
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
440
        print("!!! 'copy' failed, no file")
441
442
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
443
444
445
446
447
448
449
450
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
451
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
452
453
454
455
456
457
458
459
460
461
462
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

463
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
464
465
    log.info(res)

466
467
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
468
        print(row.sample_set_id)
469
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
470

471
472

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
473
474


475
476
477
478
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
479

480
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
481
482
    from subprocess import Popen, PIPE, STDOUT, os
    
483
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
484
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
485
    
Mathieu Giraud's avatar
Mathieu Giraud committed
486
487
488
489
490
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
491
492
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
493
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
494
    
495
    ## fuse.py 
496
    output_file = out_folder+'/'+output_filename+'.fused'
497
    files = ""
498
    sequence_file_list = ""
499
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
500
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
501
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
502
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
503
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
504
505
506
507
508
509
510
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
511
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
512
513
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
514
515
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
516
    try:
517
518
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
519
520


Mathieu Giraud's avatar
Mathieu Giraud committed
521
522
523
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
524
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
525

526
527
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
528
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
529

530
        fuse_filepath = os.path.abspath(output_file)
531
532

        stream = open(fuse_filepath, 'rb')
533
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
534
        print("!!! Fuse failed, no .fused file")
535
536
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
537
        raise
538

539
    ts = time.time()
540
541
542
543

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
544
    existing_fused_file = None
545
    if len(fused_files) > 0:
546
547
548
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
549
550
551
552
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

553
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
554
555
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
556
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
557
558
559
560
561

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
562
563
564
565
566
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
567
    
568
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
569
570
    log.info(res)

571
572
573
    # Remove temporary fused file
    os.remove(output_file)

574
    return "SUCCESS"
575

576
def custom_fuse(file_list):
577
    from subprocess import Popen, PIPE, STDOUT, os
578
579
580

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
581
    random_id = random.randint(99999999,99999999999)
582
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
583
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
584
585
586
587
588
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
589

590
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
591
592
    log.info(res)
        
593
594
595
596
597
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
598
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
599
    
600
    try:
601
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
602
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
603
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
604
605
606
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
607
    except:
608
609
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
610
        raise
611
612
613
614
615

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

616
617
618
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

619
    return data
620

HERBERT Ryan's avatar
HERBERT Ryan committed
621
#TODO move this ?
622
623
624
625
626
627
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
628
629
630
631
632
633
634
635
636
637
638
639
640
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
641
642
643
644
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
645
646
647
648
649
650
651
652
653
654
655
656
657
658

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
659
660
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
661
    res = {"redirect": "reload",
662
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
663
664
665
666
667

    log.info(res)
    return res


668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


685
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
686
687
688
689
690
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

691
692
    from subprocess import Popen, PIPE, STDOUT, os
    
693
    sequence_file = db.sequence_file[sequence_file_id]
694
695
696
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
697
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
698
699
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
700
701
702
703
704
705
706
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

707
    output_file = out_folder+'/'+output_filename
708
709
            
    pre_process = db.pre_process[pre_process_id]
710

711
    try:
712
713
714
715
716
717
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)

Mathieu Giraud's avatar
Mathieu Giraud committed
718
719
720
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
721
        sys.stdout.flush()
722

723
724
725
726
727
728
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
729
        print("Output log in " + out_log)
730

731
        filepath = os.path.abspath(output_file)
732
733

        stream = open(filepath, 'rb')
734
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
735
        print("!!! Pre-process failed, no result file")
736
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
737
        log.error(res)
738
739
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
740
        raise
741

marc's avatar
marc committed
742
743
        

Mathieu Giraud's avatar
Mathieu Giraud committed
744
745
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
746
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
747
                                              data_file2 = None,
748
                                              pre_process_flag = "DONE")
749
    db.commit()
750
751
752
753
754
755
756
757
758
759
760
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
761
762
    db.commit()

763
    # Dump log in scheduler_run.run_output
764
765
    log_file.close()
    for l in open(out_log):
Mathieu Giraud's avatar
Mathieu Giraud committed
766
        print(l, end=' ')
767
768
769

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
770
    
771
772
773
774
775
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
776
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
777
778
779
780
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
781
    
782
from gluon.scheduler import Scheduler
783
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
784
                               compute_contamination=compute_contamination,
785
                               mixcr=run_mixcr,
786
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
787
                               pre_process=run_pre_process,
788
                               refuse=run_refuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
789
                        heartbeat=defs.SCHEDULER_HEARTBEAT)