Tutorial

Example

Import

Import the required packages and select the propagation model:

import torch
from torch_geometric.datasets import Planetoid # or other datasets
from torch_geometric.utils import degree
from fs_gplib.Epidemics import SIRModel

Data

Read the graph data, using the data from the PyG package, which has a compressed matrix row structure.

dataset = Planetoid(root='./dataset', name='Cora')
data = dataset[0]

If the data is not in the compressed matrix row structure, it needs to be converted. See Data for the data structure construction.

If the data is a Networkx.Graph(), it can be converted using from_networkx.

import networkx as nx
from torch_geometric.utils import from_networkx

G = nx.florentine_families_graph()
data = from_networkx(G)

mapping = dict(zip(G.nodes(), range(G.number_of_nodes())))

Note

Note that the encoding methods for the two data formats are different, and the mapping needs to be used for comparison.

Seeds

The format of the seed nodes can be list or float.

The seed set can be calculated through various algorithms or specified by the user, with the following simple example selecting the top 10% of nodes based on degree centrality:

def Degree_centrality_seeds(data, num_seeds):
    target = data.edge_index[1]
    N = data.num_nodes
    D = degree(target, N)
    _, index = torch.sort(D, descending=True)
    seeds = index[:num_seeds]
    return seeds.tolist()

num_seeds = int(data.num_nodes*0.1)
seeds = Degree_centrality_seeds(data, num_seeds)

Also, the seed set can be generated randomly. See the next section for details.

Instantiate Model

After importing the data, the seed set, and confirming the model parameters, build the model:

i_beta=0.01 # infection
r_lambda=0.005 # recovery
device = 1 # device
model = SIRModel(data, seeds, i_beta, r_lambda, device)

The parameters required for each model are described in the model introduction section.

If there is no seed set, the seed set can be generated randomly by setting seeds to float, and the corresponding proportion of seed nodes can be generated:

seeds = 0.3 # ratio of seeds
i_beta=0.01 # infection
r_lambda=0.005 # recovery
device = 1 # device
model = SIRModel(data, seeds, i_beta, r_lambda, device)

# check the seeds set
print(model.seeds)

If you want to make the randomly generated seeds reproducible, please add a random seed when building the model:

seeds = 0.3 # ratio of seeds
i_beta=0.01 # infection
r_lambda=0.005 # recovery
device = 1 # device
model = SIRModel(data, seeds, i_beta, r_lambda, device, rand_seed=33)

# check the seeds set
print(model.seeds)

Execute simulation

There are four different running interfaces:

  • Model.run_iteration(): Execute one time step from the current node state and return the updated node state.

  • Model.run_iterations(times): Execute multiple time steps starting from the current node state and return the final node state after all iterations.

  • Model.run_epoch(times): Execute multiple time steps starting from the initial state and return the final node state after all iterations.

  • Model.run_epochs(epochs, times, batch_size): Perform multiple Monte Carlo simulations in batches, each starting from the initial state, and return the final node states.

An example of running:

MC = 10000 # simulation times
it = 100 # iteration times
bs = 2000 # batch size
finals = model.run_epochs(MC, it, bs)

Example code 1: SIRModel

A complete example:

import torch
from torch_geometric.datasets import Planetoid
from torch_geometric.utils import degree
from fs_gplib.Epidemics import SIRModel

def Degree_centrality_seeds(data, num_seeds):
    target = data.edge_index[1]
    N = data.num_nodes
    D = degree(target, N)
    _, index = torch.sort(D, descending=True)
    seeds = index[:num_seeds]
    return seeds.tolist()

dataset = Planetoid(root='./dataset', name='Cora')
data = dataset[0]
num_seeds = int(data.num_nodes*0.1)
seeds = Degree_centrality_seeds(data, num_seeds)

i_beta=0.01
r_lambda=0.005
device = 1
model = SIRModel(data, seeds, i_beta, r_lambda, device)

MC = 10000
it = 100
bs = 2000
finals = model.run_epochs(MC, it, bs)

count = (finals>0).sum().item()/MC
print(f'Final spread range is {count}')

Example code 2: DySIRModel

import torch
import random
from torch_geometric.datasets import BitcoinOTC
import matplotlib.pyplot as plt
import numpy as np

from fs_gplib.Dynamic import DySIRModel

def plot_dy_sir(finals):
    # finals.shape: (num_timesteps (T), MC, num_nodes)
    if isinstance(finals, torch.Tensor):
        finals_np = finals.cpu().numpy()
    else:
        finals_np = np.array(finals)

    T = finals_np.shape[0]
    MC = finals_np.shape[1]
    N = finals_np.shape[2]

    all_states = np.unique(finals_np)
    avg_state_cnt = []
    for t in range(T):
        state_cnt = []
        for s in all_states:
            # (MC, N) == s -> sum over nodes, then mean over MC runs
            count = np.sum(finals_np[t] == s, axis=1)
            state_cnt.append(np.mean(count))
        avg_state_cnt.append(state_cnt)
    avg_state_cnt = np.array(avg_state_cnt)  # (T, num_state)

    for idx, s in enumerate(all_states):
        plt.plot(range(T), avg_state_cnt[:, idx], label=f"State {s}")

    plt.xlabel("Time Step")
    plt.ylabel("Average #Nodes")
    plt.title("Average Number of Nodes per State Over Time")
    plt.legend(["S", "I", "R"])
    plt.tight_layout()
    plt.show()

dataset = BitcoinOTC(root="./dataset")

data = dataset[0]
seeds = data.edge_index[1].unique().tolist()

# use weight or not
use_weight = False
if use_weight:
    print("Using weight")
    random.seed(42)
    edge_attr = torch.tensor(
        [random.random() for _ in range(data.edge_index.shape[1])]
    )
    data.edge_attr = edge_attr

infection_beta = 0.1
recovery_lambda = 0.05
device = 'cpu' #0
MC = 1000

x = torch.arange(data.num_nodes)
edge_index_list = [dataset[i].edge_index for i in range(len(dataset))]

model = DySIRModel(x, edge_index_list, seeds, infection_beta, recovery_lambda, device, use_weight)
finals = model.run_epochs(MC, batch_size=100)

plot_dy_sir(finals)

The Output:

result for DySIRModel

About Batch

The core of the batch parallel algorithm for macroscopic acceleration, used in Model.run_epochs(epochs, times, batch_size).

How to use batch

Batch parallel allows multiple simulations to run simultaneously, but the batch size cannot be increased indefinitely because it is constrained by hardware resources. If the batch size is too large, out-of-memory errors may occur. If you are interested in the impact of batch size on running efficiency, you can refer to the experimental results in Section 5.3.2 [1]. Users can find the appropriate batch size through an exponential decrease.

About Distributed

The distributed method is suitable for extremely large data that cannot be processed on a single machine. The core of the distributed method is to divide the data by the target nodes, so that the data volume between subgraphs is not skewed, and only one synchronization is performed at each time step.

This method can be used to simulate the propagation of extremely large graphs on single-card devices, multi-card devices, and multi-machine multi-card devices.

How to use distributed computing

The algorithm library does not include distributed code, but provides a data division method.

An implementation example is given:

import argparse
import os
import numpy as np

import torch
from torch_geometric.datasets import Planetoid
from torch_geometric.utils import degree
import torch.distributed as dist
from tqdm import tqdm

from fs_gplib.Epidemics import SIRModel
from fs_gplib.DistributedComputing.DistributedComputing import GraphPartitioner, load_partition

def parse_args():
    parser = argparse.ArgumentParser(description="Distributed SIR Simulation (fs-gplib)")
    parser.add_argument("-d", "--dataset", default="Yelp", choices=["Cora", "PubMed"], help="Dataset name")

    parser.add_argument("-p", "--parts", type=int, default=4, help="Number of partitions / processes")
    parser.add_argument("-s", "--steps", type=int, default=100, help="Number of iterations per MC run")
    parser.add_argument("-mc", "--MC", type=int, default=1000, help="Number of Monte Carlo simulations")
    parser.add_argument("-rp", "--read_pt", action="store_true", help="Read existing .pt partition files instead of re-partitioning")
    return parser.parse_args()


def get_degree_seeds(data, num_seeds: int):
    deg = degree(data.edge_index[1], data.num_nodes, dtype=torch.float)
    deg, idx = torch.sort(deg, descending=True)
    return idx[:num_seeds].tolist()

def load_data(dataset_name: str):
    if dataset_name == "Cora":
        dataset = Planetoid(root="./dataset", name="Cora")
    elif dataset_name == "PubMed":
        dataset = Planetoid(root="./dataset", name="PubMed")
    else:
        raise ValueError(f"Unsupported dataset: {dataset_name}")
    return dataset[0]


def main():
    args = parse_args()

    infection_beta = 0.01
    recovery_lambda = 0.005

    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    dist.init_process_group(backend="nccl", init_method="env://", device_id=torch.device(f"cuda:{local_rank}"))

    rank = dist.get_rank()
    world_size = dist.get_world_size()
    device_id = rank % torch.cuda.device_count()

    if rank == 0:
        final_count = []
        data = load_data(args.dataset)
        print(f"Loaded dataset {args.dataset}, edge_index device = {data.edge_index.device}")
        seeds = get_degree_seeds(data, int(data.num_nodes * 0.1))

        if not args.read_pt:
            print("Start graph partitioning...")
            t_s = time.time()
            partitioner = GraphPartitioner(
                data,
                n_parts=world_size,
                root=f"./dataset/{args.dataset}",
            )
            partitioner.generate_partition()
            print(f"Partitioning done in {time.time() - t_s:.2f} s")
    else:
        seeds = None

    dist.barrier()

    seed_list = [seeds] if rank == 0 else [None]
    dist.broadcast_object_list(seed_list, src=0)
    seeds = seed_list[0]

    sub_data, sub_targets = load_partition(f"./dataset/{args.dataset}", rank)

    model = SIRModel(
        sub_data,
        seeds=seeds,
        infection_beta=infection_beta,
        recovery_lambda=recovery_lambda,
        device=device_id,
    )

    mask = torch.ones_like(model.node_status["SI"], dtype=torch.bool)
    mask[sub_targets] = False

    bar = tqdm(range(args.MC)) if rank == 0 else range(args.MC)

    torch.cuda.reset_peak_memory_stats(device_id)

    for i, _ in enumerate(bar):
        model._init_node_status()
        if rank == 0:
            bar.set_description(f"MC {i}")

        for step in range(args.steps):
            model.run_iteration()

            synced_status = {}
            for key, value in model.node_status.items():
                value[mask] = False
                int_status = value.to(dtype=torch.uint8)

                dist.all_reduce(int_status, op=dist.ReduceOp.SUM)
                torch.cuda.synchronize(device_id)

                synced_status[key] = int_status.bool()

            model.node_status = synced_status

        if rank == 0:
            final_state = model._return_final(model.node_status.values())
            infected_count = (final_state == 1).sum().item()
            final_count.append(infected_count)

    if rank == 0:
        print(f"[Rank {rank}] Final infected count: {np.mean(final_count)}")
        print(f'max: {np.max(final_count)}')
        print(f'min: {np.min(final_count)}')

    if dist.is_initialized():
        dist.destroy_process_group()

if __name__ == "__main__":
    main()

The running method of this code is:

CUDA_VISIBLE_DEVICES=1,2,3,4,5,6 torchrun --nnodes=1 --nproc_per_node=6 --node_rank=0 --master_addr=127.0.0.1 --master_port=12345 test_DC.py -d Webbase -s 100 -mc 1000

Command line interpretation:

  • CUDA_VISIBLE_DEVICES=1,2,3,4,5,6

    Restricts the script to only use GPUs with device IDs 1 through 6. These are the GPU IDs made visible to the process.

  • torchrun

    The PyTorch launcher used for distributed training/simulation across multiple processes or GPUs.

  • --nnodes=1

    Specifies that only one physical machine (node) is involved in the computation.

  • --nproc_per_node=6

    Launches 6 separate processes on the single node, typically one per GPU (corresponding to the 6 visible GPUs).

  • --node_rank=0

    Indicates the rank of this node in a multi-node setup. Since there’s only one node, it’s rank 0.

  • --master_addr=127.0.0.1

    Sets the IP address of the master node (localhost in this case).

  • --master_port=12345

    Sets the port used for inter-process communication.

  • test_DC.py

    The Python script to be executed, which performs a distributed SIR simulation.

  • -d Webbase

    Specifies the dataset to use (Webbase in this case).

  • -s 100

    Sets the number of simulation steps per Monte Carlo run to 100.

  • -mc 1000

    Specifies that 1000 Monte Carlo simulations will be run.

References