Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
melissa
melissa-da
Commits
517a4e38
Commit
517a4e38
authored
Jul 29, 2019
by
FRIEDEMANN Sebastian
Browse files
Renaming
parent
caebf0e5
Changes
3
Hide whitespace changes
Inline
Side-by-side
api/melissa_api.cxx
View file @
517a4e38
...
...
@@ -66,12 +66,12 @@ int getCommSize()
}
struct
ServerRank
struct
ServerRank
Connection
{
void
*
data_request_socket
;
ServerRank
(
const
char
*
addr_request
)
ServerRank
Connection
(
const
char
*
addr_request
)
{
data_request_socket
=
zmq_socket
(
context
,
ZMQ_REQ
);
assert
(
data_request_socket
);
...
...
@@ -84,7 +84,7 @@ struct ServerRank
D
(
"connect socket %p"
,
data_request_socket
);
}
~
ServerRank
()
~
ServerRank
Connection
()
{
D
(
"closing socket %p"
,
data_request_socket
);
zmq_close
(
data_request_socket
);
...
...
@@ -174,9 +174,9 @@ struct Server
Server
server
;
struct
ServerRanks
{
static
map
<
int
,
unique_ptr
<
ServerRank
>>
ranks
;
static
map
<
int
,
unique_ptr
<
ServerRank
Connection
>>
ranks
;
static
ServerRank
&
get
(
int
server_rank
)
static
ServerRank
Connection
&
get
(
int
server_rank
)
{
auto
found
=
ranks
.
find
(
server_rank
);
if
(
found
==
ranks
.
end
())
...
...
@@ -185,7 +185,7 @@ struct ServerRanks {
// we use unique_ptr's as other wise we would create a ServerRank locally, we than would copy all its values in the ranks map
// and then we would destroy it. unfortunately this also closes the zmq connection !
auto
res
=
ranks
.
emplace
(
server_rank
,
unique_ptr
<
ServerRank
>
(
new
ServerRank
(
server
.
port_names
.
data
()
+
server_rank
*
MPI_MAX_PROCESSOR_NAME
)));
unique_ptr
<
ServerRank
Connection
>
(
new
ServerRank
Connection
(
server
.
port_names
.
data
()
+
server_rank
*
MPI_MAX_PROCESSOR_NAME
)));
return
*
res
.
first
->
second
;
}
else
...
...
@@ -194,12 +194,12 @@ struct ServerRanks {
}
}
};
map
<
int
,
unique_ptr
<
ServerRank
>>
ServerRanks
::
ranks
;
map
<
int
,
unique_ptr
<
ServerRank
Connection
>>
ServerRanks
::
ranks
;
struct
ConnectedServerRank
{
size_t
send_count
;
size_t
local_vector_offset
;
ServerRank
&
server_rank
;
ServerRank
Connection
&
server_rank
;
};
struct
Field
{
...
...
@@ -422,7 +422,7 @@ void melissa_finalize() // TODO: when using more serverranks, wait until an end
MPI_Barrier
(
comm
);
//sleep(3);
D
(
"End Simulation."
);
D
(
"server
_
ranks: %lu"
,
ServerRanks
::
ranks
.
size
());
D
(
"server
ranks: %lu"
,
ServerRanks
::
ranks
.
size
());
phase
=
PHASE_FINAL
;
// TODO: free all pointers?
...
...
common/n_to_m.h
View file @
517a4e38
...
...
@@ -10,25 +10,25 @@ struct n_to_m { // TODO: rename datatype into Part
size_t
send_count
;
};
std
::
vector
<
n_to_m
>
calculate_n_to_m
(
int
ranks
_server
,
const
std
::
vector
<
size_t
>
&
local_vect_sizes_simu
)
std
::
vector
<
n_to_m
>
calculate_n_to_m
(
const
int
comm_size
_server
,
const
std
::
vector
<
size_t
>
&
local_vect_sizes_simu
)
{
size_t
ranks
_simu
=
local_vect_sizes_simu
.
size
();
size_t
comm_size
_simu
=
local_vect_sizes_simu
.
size
();
std
::
vector
<
n_to_m
>
parts
;
size_t
local_vect_sizes_server
[
ranks
_server
];
size_t
local_vect_sizes_server
[
comm_size
_server
];
size_t
global_vect_size
=
0
;
for
(
size_t
i
=
0
;
i
<
ranks
_simu
;
++
i
)
for
(
size_t
i
=
0
;
i
<
comm_size
_simu
;
++
i
)
{
global_vect_size
+=
local_vect_sizes_simu
[
i
];
}
for
(
int
i
=
0
;
i
<
ranks
_server
;
++
i
)
for
(
int
i
=
0
;
i
<
comm_size
_server
;
++
i
)
{
// every server rank gets the same amount
local_vect_sizes_server
[
i
]
=
global_vect_size
/
ranks
_server
;
local_vect_sizes_server
[
i
]
=
global_vect_size
/
comm_size
_server
;
// let n be the rest of this division
// the first n server ranks get one more to split the rest fair up...
size_t
n_rest
=
global_vect_size
-
size_t
(
global_vect_size
/
ranks
_server
)
*
ranks
_server
;
size_t
n_rest
=
global_vect_size
-
size_t
(
global_vect_size
/
comm_size
_server
)
*
comm_size
_server
;
if
(
size_t
(
i
)
<
n_rest
)
{
local_vect_sizes_server
[
i
]
++
;
...
...
server/server.cxx
View file @
517a4e38
...
...
@@ -2,6 +2,7 @@
// TODO 1. refactoring
// TODO: use zmq cpp??
// TODO 2. error prone ness
// TODO: check ret values!
// TODO 3. check with real world sim and DA.
#include <map>
...
...
@@ -56,7 +57,7 @@ struct Part
struct
EnsembleMember
{
vector
<
double
>
state_analysis
;
// really tODO use vector! vector.data() will give the same... raw pointer!
vector
<
double
>
state_analysis
;
vector
<
double
>
state_background
;
size_t
received_state_background
=
0
;
...
...
@@ -94,7 +95,6 @@ struct Field {
ensemble_members
.
resize
(
ensemble_size_
);
}
// TODO: naming: server_comm size or ranks_server? same for simu!
/// Calculates all the state vector parts that are send between the server and the
/// simulations
void
calculate_parts
(
int
server_comm_size
)
...
...
@@ -186,7 +186,7 @@ struct ConnectedSimulationRank {
/// returns true if a new was sent.
bool
try_
to_start
_task
(
int
simu_rank
)
{
bool
try_
send
_task
(
int
simu_rank
)
{
// already working?
if
(
current_task
!=
WANT_WORK
)
...
...
@@ -299,9 +299,9 @@ struct Simulation // Model process runner
}
}
void
try_
to_start
_task
()
{
//
todo
: replace fields.begin() by field as there will be only on field soon.
void
try_
send
_task
()
{
//
low
: replace fields.begin() by field as there will be only on field soon.
for
(
auto
cs
=
connected_simulation_ranks
.
begin
();
cs
!=
connected_simulation_ranks
.
end
();
cs
++
)
{
cs
->
second
.
try_
to_start
_task
(
cs
->
first
);
cs
->
second
.
try_
send
_task
(
cs
->
first
);
}
}
...
...
@@ -671,6 +671,7 @@ int main(int argc, char * argv[])
zmq_msg_size
(
&
data_msg
),
part
.
send_count
*
sizeof
(
double
),
field_name
,
simu_id
,
simu_rank
,
simu_state_id
,
simu_timestamp
);
D
(
"local server offset %lu, sendcount=%lu"
,
part
.
local_offset_server
,
part
.
send_count
);
D
(
"values[0] = %.3f"
,
reinterpret_cast
<
double
*>
(
zmq_msg_data
(
&
data_msg
))[
0
]);
if
(
simu_timestamp
==
current_timestamp
)
{
...
...
@@ -695,7 +696,7 @@ int main(int argc, char * argv[])
zmq_msg_close
(
&
data_msg
);
// Check if we can answer directly with new data... means starting of a new model task
bool
got_task
=
simu
.
connected_simulation_ranks
[
simu_rank
].
try_
to_start
_task
(
simu_rank
);
bool
got_task
=
simu
.
connected_simulation_ranks
[
simu_rank
].
try_
send
_task
(
simu_rank
);
// If we could not start a new model task try to schedule a new one. This is initiated by server rank 0
if
(
!
got_task
&&
comm_rank
==
0
)
...
...
@@ -704,7 +705,7 @@ int main(int argc, char * argv[])
schedule_new_task
(
simu_id
);
// try to run it directly:
simu
.
connected_simulation_ranks
[
simu_rank
].
try_
to_start
_task
(
simu_rank
);
simu
.
connected_simulation_ranks
[
simu_rank
].
try_
send
_task
(
simu_rank
);
}
}
...
...
@@ -740,7 +741,7 @@ int main(int argc, char * argv[])
{
if
(
schedule_new_task
(
simu_it
->
first
))
{
// normally we arrive always here if there are not more model task runners than ensemble members.
simu_it
->
second
.
try_
to_start
_task
();
simu_it
->
second
.
try_
send
_task
();
}
}
}
...
...
@@ -748,8 +749,6 @@ int main(int argc, char * argv[])
}
// TODO: check ret values!
// TODO: remove compile warnings!
if
(
phase
==
PHASE_SIMULATION
&&
comm_rank
!=
0
)
{
...
...
@@ -766,7 +765,7 @@ int main(int argc, char * argv[])
simu
.
addTask
(
new_task
.
task_id
,
new_task
.
task
);
// If so try to start them directly.
simu
.
try_
to_start
_task
();
simu
.
try_
send
_task
();
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment