From a47eaafdc79342ba15572433bc36fc94d4861aaa Mon Sep 17 00:00:00 2001
From: Mathieu Faverge <mathieu.faverge@inria.fr>
Date: Wed, 2 Dec 2020 18:20:37 +0100
Subject: [PATCH] MergeDuplicate: update for distributed

---
 CMakeLists.txt             |   2 +-
 src/spm.c                  |  46 +++++-
 src/z_spm.c                | 318 -------------------------------------
 src/z_spm_mergeduplicate.c | 148 +++++++++++++++++
 4 files changed, 187 insertions(+), 327 deletions(-)
 delete mode 100644 src/z_spm.c
 create mode 100644 src/z_spm_mergeduplicate.c

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 66834ebf..c7ee734b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -191,7 +191,6 @@ add_custom_target(spm_headers_tgt
 set(generated_sources "")
 
 set(SOURCES
-  src/z_spm.c
   src/z_spm_2dense.c
   src/z_spm_dof_extend.c
   src/z_spm_norm.c
@@ -206,6 +205,7 @@ set(SOURCES
   src/z_spm_integer.c
   src/z_spm_laplacian.c
   src/z_spm_matrixvector.c
+  src/z_spm_mergeduplicate.c
   src/z_spm_print.c
   src/z_spm_sort.c
   )
diff --git a/src/spm.c b/src/spm.c
index a0cff78b..5ff4ec5e 100644
--- a/src/spm.c
+++ b/src/spm.c
@@ -545,9 +545,10 @@ spmSort( spmatrix_t *spm )
  *
  * @brief Merge multiple entries in a spm by summing their values together.
  *
- * The sparse matrix needs to be sorted first (see spmSort()).
+ * The sparse matrix needs to be sorted first (see z_spmSort()). In distributed,
+ * only local entries are merged together.
  *
- * @warning Not implemented for CSR and IJV format.
+ * @warning Not implemented for IJV format.
  *
  *******************************************************************************
  *
@@ -565,25 +566,54 @@ spmSort( spmatrix_t *spm )
 spm_int_t
 spmMergeDuplicate( spmatrix_t *spm )
 {
+    spm_int_t local, global;
+
     switch (spm->flttype) {
     case SpmPattern:
-        return p_spmMergeDuplicate( spm );
+        local = p_spmMergeDuplicate( spm );
+        break;
 
     case SpmFloat:
-        return s_spmMergeDuplicate( spm );
+        local = s_spmMergeDuplicate( spm );
+        break;
 
     case SpmDouble:
-        return d_spmMergeDuplicate( spm );
+        local = d_spmMergeDuplicate( spm );
+        break;
 
     case SpmComplex32:
-        return c_spmMergeDuplicate( spm );
+        local = c_spmMergeDuplicate( spm );
+        break;
 
     case SpmComplex64:
-        return z_spmMergeDuplicate( spm );
+        local = z_spmMergeDuplicate( spm );
+        break;
 
     default:
-        return SPM_ERR_BADPARAMETER;
+        return (spm_int_t)SPM_ERR_BADPARAMETER;
+    }
+
+#if defined(SPM_WITH_MPI)
+    if ( spm->loc2glob ) {
+        MPI_Allreduce( &local, &global, 1, SPM_MPI_INT, MPI_SUM, spm->comm );
+
+        /* Update computed fields */
+        if( global > 0 ) {
+            MPI_Allreduce( &(spm->nnz),    &(spm->gnnz),    1, SPM_MPI_INT, MPI_SUM, spm->comm );
+            MPI_Allreduce( &(spm->nnzexp), &(spm->gnnzexp), 1, SPM_MPI_INT, MPI_SUM, spm->comm );
+        }
     }
+    else
+#endif
+    {
+        global = local;
+        if ( global > 0 ) {
+            spm->gnnz    = spm->nnz;
+            spm->gnnzexp = spm->nnzexp;
+        }
+    }
+
+    return global;
 }
 
 /**
diff --git a/src/z_spm.c b/src/z_spm.c
deleted file mode 100644
index 273593db..00000000
--- a/src/z_spm.c
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- *
- * @file z_spm.c
- *
- * SParse Matrix package precision dependent routines.
- *
- * @copyright 2016-2020 Bordeaux INP, CNRS (LaBRI UMR 5800), Inria,
- *                      Univ. Bordeaux. All rights reserved.
- *
- * @version 1.0.0
- * @author Mathieu Faverge
- * @date 2019-10-29
- *
- * @precisions normal z -> c d s p
- *
- **/
-#include "common.h"
-#include "z_spm.h"
-#include <string.h>
-
-/**
- *******************************************************************************
- *
- * @ingroup spm_dev_check
- *
- * @brief This routine merge the multiple entries in a sparse
- * matrix by suming their values together.
- *
- * The sparse matrix needs to be sorted first (see z_spmSort()).
- *
- * @warning This function should NOT be called if dof is greater than 1.
- *
- *******************************************************************************
- *
- * @param[inout] spm
- *          On entry, the pointer to the sparse matrix structure.
- *          On exit, the reduced sparse matrix of multiple entries were present
- *          in it. The multiple values for a same vertex are sum up together.
- *
- ********************************************************************************
- *
- * @return The number of vertices that were merged. -1 on error.
- *
- *******************************************************************************/
-spm_int_t
-z_spmMergeDuplicate( spmatrix_t *spm )
-{
-    spm_int_t       *colptr = spm->colptr;
-    spm_int_t       *oldrow = spm->rowptr;
-    spm_int_t       *newrow = spm->rowptr;
-    spm_complex64_t *newval = spm->values;
-    spm_complex64_t *oldval = spm->values;
-    spm_int_t n       = spm->n;
-    spm_int_t baseval = spm->colptr[0];
-    spm_int_t dof2    = spm->dof * spm->dof;
-    spm_int_t idx, i, j, size, savedcolptr;
-    spm_int_t merge = 0;
-#if !defined(PRECISION_p)
-    spm_int_t d;
-#endif
-
-    assert( spm->dof >= 1 );
-    assert( spm->fmttype == SpmCSC );
-
-    if ( spm->fmttype == SpmCSC ) {
-        idx = baseval;
-        savedcolptr = colptr[0];
-        for (i=0; i<n; i++, colptr++)
-        {
-            size = colptr[1] - savedcolptr;
-            savedcolptr = colptr[1];
-
-            for (j = 0; j < size;
-                 j++, oldrow++, oldval+=dof2, newrow++, newval+=dof2, idx++)
-            {
-                if ( newrow != oldrow ) {
-                    newrow[0] = oldrow[0];
-#if !defined(PRECISION_p)
-                    memcpy( newval, oldval, dof2 * sizeof(spm_complex64_t) );
-#endif
-                }
-
-                while( ((j+1) < size) && (newrow[0] == oldrow[1]) ) {
-                    j++;
-                    oldrow++;
-                    oldval += dof2;
-#if !defined(PRECISION_p)
-                    /* Merge the two sets of values */
-                    for (d = 0; d < dof2; d++) {
-                        newval[d] += oldval[d];
-                    }
-#endif
-                    merge++;
-                }
-            }
-            assert( ( (merge == 0) && (colptr[1] == idx) ) ||
-                    ( (merge != 0) && (colptr[1] >  idx) ) );
-
-            colptr[1] = idx;
-        }
-        assert( ((merge == 0) && (spm->nnz         == (idx-baseval))) ||
-                ((merge != 0) && (spm->nnz - merge == (idx-baseval))) );
-
-        if (merge > 0) {
-            spm->nnz = spm->nnz - merge;
-            spm->gnnz = spm->nnz;
-
-            newrow = malloc( spm->nnz * sizeof(spm_int_t) );
-            memcpy( newrow, spm->rowptr, spm->nnz * sizeof(spm_int_t) );
-            free(spm->rowptr);
-            spm->rowptr = newrow;
-
-#if !defined(PRECISION_p)
-            newval = malloc( spm->nnz * dof2 * sizeof(spm_complex64_t) );
-            memcpy( newval, spm->values, spm->nnz * dof2 * sizeof(spm_complex64_t) );
-            free(spm->values);
-            spm->values = newval;
-#endif
-        }
-    }
-
-    return merge;
-}
-
-/**
- *******************************************************************************
- *
- * @ingroup spm_dev_check
- *
- * @brief This routine corrects the sparse matrix structure if it's
- * pattern is not symmetric.
- *
- * It returns the new symmetric pattern with zeroes on
- * the new entries.
- *
- *******************************************************************************
- *
- * @param[inout] spm
- *          On entry, the pointer to the sparse matrix structure.
- *          On exit, the same sparse matrix with extra entries that makes it
- *          pattern symmetric.
- *
- *******************************************************************************
- *
- * @retval Return the number of entries added to the matrix.
- *
- *******************************************************************************/
-spm_int_t
-z_spmSymmetrize( spmatrix_t *spm )
-{
-    spm_complex64_t *oldval, *valtmp, *newval = NULL;
-    spm_int_t *oldcol, *coltmp, *newcol = NULL;
-    spm_int_t *oldrow, *rowtmp, *newrow = NULL;
-    spm_int_t *toaddtab = NULL;
-    spm_int_t *toaddtmp, toaddcnt, toaddsze;
-    spm_int_t  n       = spm->n;
-    spm_int_t  dof2    = spm->dof * spm->dof;
-    spm_int_t  i, j, k, r, size;
-    spm_int_t  baseval;
-
-    toaddcnt = 0;
-    toaddsze = 0;
-
-    if ( (spm->fmttype == SpmCSC) || (spm->fmttype == SpmCSR) ) {
-        if (spm->fmttype == SpmCSC) {
-            oldcol = spm->colptr;
-            coltmp = spm->colptr;
-            oldrow = spm->rowptr;
-            rowtmp = spm->rowptr;
-        }
-        else {
-            oldcol = spm->rowptr;
-            coltmp = spm->rowptr;
-            oldrow = spm->colptr;
-            rowtmp = spm->colptr;
-        }
-
-        baseval  = oldcol[0];
-        for (i=0; i<n; i++, coltmp++)
-        {
-            size = coltmp[1] - coltmp[0];
-            for (r=0; r<size; r++, rowtmp++ )
-            {
-                j = rowtmp[0]-baseval;
-                if ( i != j ) {
-                    /* Look for the element (j, i) */
-                    spm_int_t frow = oldcol[ j ] - baseval;
-                    spm_int_t lrow = oldcol[ j+1 ] - baseval;
-                    int found = 0;
-
-                    for (k = frow; k < lrow; k++)
-                    {
-                        if (i == (oldrow[k]-baseval))
-                        {
-                            found = 1;
-                            break;
-                        }
-                        else if ( i < (oldrow[k]-baseval))
-                        {
-                            /* The spm is sorted so we will not find it later */
-                            break;
-                        }
-                    }
-
-                    if ( !found ) {
-                        if ( newcol == NULL ) {
-                            newcol = malloc( (spm->n+1) * sizeof(spm_int_t) );
-                            for (k=0; k<spm->n; k++) {
-                                newcol[k] = oldcol[k+1] - oldcol[k];
-                            }
-
-                            /* Let's start with a buffer that will contain 5% of extra elements */
-                            toaddsze = spm_imax( 0.05 * (double)spm->nnz, 1 );
-                            toaddtab = malloc( 2*toaddsze * sizeof(spm_int_t) );
-                        }
-
-                        if (toaddcnt >= toaddsze) {
-                            toaddsze *= 2;
-                            toaddtab = (spm_int_t*)realloc( toaddtab, 2*toaddsze*sizeof(spm_int_t) );
-                        }
-
-                        /* Newcol stores the number of elements per column */
-                        newcol[ j ]++;
-                        /* toaddtab stores the couples (j, i) to be added in the final sparse matrix */
-                        toaddtab[ toaddcnt * 2     ] = j;
-                        toaddtab[ toaddcnt * 2 + 1 ] = i;
-                        toaddcnt++;
-                    }
-                }
-            }
-        }
-
-        if (toaddcnt > 0) {
-            spm_int_t newsize, oldsize;
-
-            /* Sort the array per column */
-            spmIntSort2Asc1(toaddtab, toaddcnt);
-
-            spm->nnz = spm->nnz + toaddcnt;
-            spm->gnnz = spm->nnz;
-
-            newrow = malloc( spm->nnz        * sizeof(spm_int_t) );
-#if !defined(PRECISION_p)
-            newval = malloc( spm->nnz * dof2 * sizeof(spm_complex64_t) );
-#endif
-            coltmp = newcol;
-            rowtmp = newrow;
-            valtmp = newval;
-            oldval = spm->values;
-            toaddtmp = toaddtab;
-
-            newsize = coltmp[0];
-            coltmp[0] = baseval;
-            for (i=0; i<n; i++, coltmp++, oldcol++)
-            {
-                /* Copy the existing elements */
-                oldsize = oldcol[1] - oldcol[0];
-                memcpy( rowtmp, oldrow, oldsize * sizeof(spm_int_t) );
-#if !defined(PRECISION_p)
-                memcpy( valtmp, oldval, oldsize * dof2 * sizeof(spm_complex64_t) );
-#endif
-
-                oldrow += oldsize;
-                rowtmp += oldsize;
-                oldval += oldsize * dof2;
-                valtmp += oldsize * dof2;
-
-                /* Some elements have been added */
-                assert( newsize >= oldsize );
-                if ( newsize > oldsize ) {
-                    int added = 0;
-                    int tobeadded = newsize - oldsize;
-
-                    /* At least one element is equal to i */
-                    assert( toaddtmp[0] == i );
-
-                    /* Let's add the new vertices */
-                    while( (added < tobeadded) && (toaddtmp[0] == i) )
-                    {
-                        rowtmp[0] = toaddtmp[1] + baseval;
-                        rowtmp++;
-                        toaddtmp+=2;
-                        added++;
-                    }
-                    assert( added == tobeadded );
-
-#if !defined(PRECISION_p)
-                    /* Add 0.s for the associated values */
-                    memset( valtmp, 0, added * dof2 * sizeof(spm_complex64_t) );
-                    valtmp += added * dof2;
-#endif
-                }
-
-                /* Use oldsize as temporary variable to update the new colptr */
-                oldsize = newsize;
-                newsize = coltmp[1];
-                coltmp[1] = coltmp[0] + oldsize;
-            }
-
-            assert( coltmp[0]-baseval == spm->nnz );
-            free( toaddtab );
-            free( spm->colptr );
-            free( spm->rowptr );
-            free( spm->values );
-            if (spm->fmttype == SpmCSC) {
-                spm->colptr = newcol;
-                spm->rowptr = newrow;
-            }
-            else {
-                spm->colptr = newrow;
-                spm->rowptr = newcol;
-            }
-            spm->values = newval;
-        }
-    }
-
-    return toaddcnt;
-}
diff --git a/src/z_spm_mergeduplicate.c b/src/z_spm_mergeduplicate.c
new file mode 100644
index 00000000..b673f003
--- /dev/null
+++ b/src/z_spm_mergeduplicate.c
@@ -0,0 +1,148 @@
+/**
+ *
+ * @file z_spm_mergeduplicate.c
+ *
+ * SParse Matrix package precision dependent routines.
+ *
+ * @copyright 2016-2020 Bordeaux INP, CNRS (LaBRI UMR 5800), Inria,
+ *                      Univ. Bordeaux. All rights reserved.
+ *
+ * @version 1.0.0
+ * @author Mathieu Faverge
+ * @author Tony Delarue
+ * @date 2019-10-29
+ *
+ * @precisions normal z -> c d s p
+ *
+ **/
+#include "common.h"
+#include "z_spm.h"
+#include <string.h>
+
+/**
+ *******************************************************************************
+ *
+ * @ingroup spm_dev_check
+ *
+ * @brief This routine merge the multiple entries in a sparse matrix by summing
+ * their values together.
+ *
+ * The sparse matrix needs to be sorted first (see z_spmSort()). In distributed,
+ * only local entries are merged together.
+ *
+ *******************************************************************************
+ *
+ * @param[inout] spm
+ *          On entry, the pointer to the sparse matrix structure.
+ *          On exit, the reducton of the input sparse matrix where multiple
+ *          occurences of a same element are summed up together.
+ *
+ ********************************************************************************
+ *
+ * @return The number of vertices that were merged. -1 on error.
+ *
+ *******************************************************************************/
+spm_int_t
+z_spmMergeDuplicate( spmatrix_t *spm )
+{
+    spm_int_t       *colptr   = (spm->fmttype == SpmCSC) ? spm->colptr : spm->rowptr;
+    spm_int_t       *oldrow   = (spm->fmttype == SpmCSC) ? spm->rowptr : spm->colptr;
+    spm_int_t       *newrow   = oldrow;
+    spm_complex64_t *newval   = spm->values;
+    spm_complex64_t *oldval   = spm->values;
+    spm_int_t       *loc2glob = spm->loc2glob;
+
+    spm_int_t merge   = 0;
+    spm_int_t n       = spm->n;
+    spm_int_t baseval = spmFindBase( spm );
+    spm_int_t ig, jl, jg, dofi, dofj, dof2;
+    spm_int_t k, idx, size, valsize, savedcolptr;
+#if !defined(PRECISION_p)
+    spm_int_t d;
+#endif
+
+    if ( (spm->fmttype != SpmCSC) &&
+         (spm->fmttype != SpmCSR) )
+    {
+        fprintf(stderr, "Error : MergeDuplicate can only be called with SpmCSC or SpmCSR\n");
+        return SPM_ERR_BADPARAMETER;
+    }
+
+    idx = baseval;
+    valsize = 0;
+    savedcolptr = colptr[0];
+    for (jl=0; jl<n; jl++, colptr++, loc2glob++)
+    {
+        jg   = (spm->loc2glob == NULL) ? jl : *loc2glob - baseval;
+        dofj = (spm->dof > 0) ? spm->dof : spm->dofs[jg+1] - spm->dofs[jg];
+        size = colptr[1] - savedcolptr;
+        savedcolptr = colptr[1];
+
+        for ( k=0; k<size; k++, idx++ )
+        {
+            ig   = *newrow - baseval;
+            dofi = (spm->dof > 0) ? spm->dof : spm->dofs[ig+1] - spm->dofs[ig];
+            dof2 = dofi * dofj;
+            valsize += dof2;
+
+            /*
+             * A shift has been introduced, we need to first compact the structure
+             */
+            if ( newrow != oldrow ) {
+                newrow[0] = oldrow[0];
+#if !defined(PRECISION_p)
+                memcpy( newval, oldval, dof2 * sizeof(spm_complex64_t) );
+#endif
+            }
+
+            /*
+             * Let's sum together all identical elements
+             */
+            while( ((k+1) < size) && (newrow[0] == oldrow[1]) ) {
+                k++;
+                oldrow++;
+                oldval += dof2;
+#if !defined(PRECISION_p)
+                /* Merge the two sets of values */
+                for ( d=0; d<dof2; d++ ) {
+                    newval[d] += oldval[d];
+                }
+#endif
+                merge++;
+            }
+
+            /* Shift arrays */
+            oldrow++;
+            newrow++;
+            oldval += dof2;
+            newval += dof2;
+        }
+        assert( ( (merge == 0) && (colptr[1] == idx) ) ||
+                ( (merge != 0) && (colptr[1] >  idx) ) );
+
+        colptr[1] = idx;
+    }
+    assert( ((merge == 0) && (spm->nnz         == (idx-baseval))) ||
+            ((merge != 0) && (spm->nnz - merge == (idx-baseval))) );
+
+    /*
+     * Realloc the arrays if they have been compacted
+     */
+    if ( merge > 0 ) {
+        spm->nnz    = spm->nnz - merge;
+        spm->nnzexp = valsize;
+
+        if ( spm->fmttype == SpmCSC ) {
+            spm->rowptr = realloc( spm->rowptr, spm->nnz * sizeof( spm_int_t ) );
+        }
+        else {
+            spm->colptr = realloc( spm->colptr, spm->nnz * sizeof( spm_int_t ) );
+        }
+
+#if !defined(PRECISION_p)
+        spm->values = realloc( spm->values, valsize * sizeof( spm_complex64_t ) );
+#endif
+    }
+
+    return merge;
+}
-- 
GitLab