[PATCH,RFC] Add call_rcu_sched() - Kernel

This is a discussion on [PATCH,RFC] Add call_rcu_sched() - Kernel ; Hello! Rough first cut of patch to provide the call_rcu_sched() needed for Mathieu's markers implementation. This is to synchronize_sched() as call_rcu() is to synchronize_rcu(). Not for inclusion. Passes light testing, but should be treated with some caution given that no ...

+ Reply to Thread
Results 1 to 15 of 15

Thread: [PATCH,RFC] Add call_rcu_sched()

  1. [PATCH,RFC] Add call_rcu_sched()

    Hello!

    Rough first cut of patch to provide the call_rcu_sched() needed for
    Mathieu's markers implementation. This is to synchronize_sched()
    as call_rcu() is to synchronize_rcu().

    Not for inclusion. Passes light testing, but should be treated with
    some caution given that no part of it is more than 24 hours old.
    Known/suspected shortcomings:

    o Only lightly tested -- haven't yet tried rcutorture.

    o Need to add call_rcu_sched() testing to rcutorture.

    o If I remember correctly, an rcu_barrier_sched() is required
    (Mathieu?).

    o Interaction of this patch with CPU hotplug should be viewed
    with great suspicion.

    Signed-off-by: Paul E. McKenney
    ---

    include/linux/rcuclassic.h | 3
    include/linux/rcupdate.h | 22 +++
    include/linux/rcupreempt.h | 15 ++
    init/main.c | 1
    kernel/rcupdate.c | 20 ---
    kernel/rcupreempt.c | 256 +++++++++++++++++++++++++++++++++++++++++++--
    6 files changed, 288 insertions(+), 29 deletions(-)

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcuclassic.h linux-2.6.25-rc6-call_rcu_sched/include/linux/rcuclassic.h
    --- linux-2.6.25-rc6/include/linux/rcuclassic.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/include/linux/rcuclassic.h 2008-03-21 04:27:31.000000000 -0700
    @@ -153,7 +153,10 @@ extern struct lockdep_map rcu_lock_map;

    #define __synchronize_sched() synchronize_rcu()

    +#define call_rcu_sched(head, func) call_rcu(head, func)
    +
    extern void __rcu_init(void);
    +#define rcu_init_sched() do { } while (0)
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupdate.h linux-2.6.25-rc6-call_rcu_sched/include/linux/rcupdate.h
    --- linux-2.6.25-rc6/include/linux/rcupdate.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/include/linux/rcupdate.h 2008-03-20 21:10:42.000000000 -0700
    @@ -42,6 +42,7 @@
    #include
    #include
    #include
    +#include

    /**
    * struct rcu_head - callback structure for use with RCU
    @@ -182,6 +183,27 @@ struct rcu_head {
    (p) = (v); \
    })

    +/* Infrastructure to implement the synchronize_() primitives. */
    +
    +struct rcu_synchronize {
    + struct rcu_head head;
    + struct completion completion;
    +};
    +
    +extern void wakeme_after_rcu(struct rcu_head *head);
    +
    +#define synchronize_rcu_xxx(name, func) \
    +void name(void) \
    +{ \
    + struct rcu_synchronize rcu; \
    + \
    + init_completion(&rcu.completion); \
    + /* Will wake me after RCU finished. */ \
    + func(&rcu.head, wakeme_after_rcu); \
    + /* Wait for it. */ \
    + wait_for_completion(&rcu.completion); \
    +}
    +
    /**
    * synchronize_sched - block until all CPUs have exited any non-preemptive
    * kernel code sequences.
    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupreempt.h linux-2.6.25-rc6-call_rcu_sched/include/linux/rcupreempt.h
    --- linux-2.6.25-rc6/include/linux/rcupreempt.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/include/linux/rcupreempt.h 2008-03-21 04:31:29.000000000 -0700
    @@ -46,6 +46,20 @@
    #define rcu_bh_qsctr_inc(cpu)
    #define call_rcu_bh(head, rcu) call_rcu(head, rcu)

    +/**
    + * call_rcu_sched - Queue RCU callback for invocation after sched grace period.
    + * @head: structure to be used for queueing the RCU updates.
    + * @func: actual update function to be invoked after the grace period
    + *
    + * The update function will be invoked some time after a full
    + * synchronize_sched()-style grace period elapses, in other words after
    + * all currently executing preempt-disabled sections of code (including
    + * hardirq handlers, NMI handlers, and local_irq_save() blocks) have
    + * completed.
    + */
    +extern void call_rcu_sched(struct rcu_head *head,
    + void (*func)(struct rcu_head *head));
    +
    extern void __rcu_read_lock(void) __acquires(RCU);
    extern void __rcu_read_unlock(void) __releases(RCU);
    extern int rcu_pending(int cpu);
    @@ -57,6 +71,7 @@ extern int rcu_needs_cpu(int cpu);
    extern void __synchronize_sched(void);

    extern void __rcu_init(void);
    +extern void rcu_init_sched(void);
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);
    extern long rcu_batches_completed(void);
    diff -urpNa -X dontdiff linux-2.6.25-rc6/init/main.c linux-2.6.25-rc6-call_rcu_sched/init/main.c
    --- linux-2.6.25-rc6/init/main.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/init/main.c 2008-03-21 04:31:31.000000000 -0700
    @@ -736,6 +736,7 @@ static void __init do_basic_setup(void)
    driver_init();
    init_irq_proc();
    do_initcalls();
    + rcu_init_sched();
    }

    static int __initdata nosoftlockup;
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupdate.c linux-2.6.25-rc6-call_rcu_sched/kernel/rcupdate.c
    --- linux-2.6.25-rc6/kernel/rcupdate.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/kernel/rcupdate.c 2008-03-20 21:10:39.000000000 -0700
    @@ -39,18 +39,12 @@
    #include
    #include
    #include
    -#include
    #include
    #include
    #include
    #include
    #include

    -struct rcu_synchronize {
    - struct rcu_head head;
    - struct completion completion;
    -};
    -
    static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    static atomic_t rcu_barrier_cpu_count;
    static DEFINE_MUTEX(rcu_barrier_mutex);
    @@ -60,7 +54,7 @@ static struct completion rcu_barrier_com
    * Awaken the corresponding synchronize_rcu() instance now that a
    * grace period has elapsed.
    */
    -static void wakeme_after_rcu(struct rcu_head *head)
    +void wakeme_after_rcu(struct rcu_head *head)
    {
    struct rcu_synchronize *rcu;

    @@ -77,17 +71,7 @@ static void wakeme_after_rcu(struct rcu_
    * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
    * and may be nested.
    */
    -void synchronize_rcu(void)
    -{
    - struct rcu_synchronize rcu;
    -
    - init_completion(&rcu.completion);
    - /* Will wake me after RCU finished */
    - call_rcu(&rcu.head, wakeme_after_rcu);
    -
    - /* Wait for it */
    - wait_for_completion(&rcu.completion);
    -}
    +synchronize_rcu_xxx(synchronize_rcu, call_rcu)
    EXPORT_SYMBOL_GPL(synchronize_rcu);

    static void rcu_barrier_callback(struct rcu_head *notused)
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupreempt.c linux-2.6.25-rc6-call_rcu_sched/kernel/rcupreempt.c
    --- linux-2.6.25-rc6/kernel/rcupreempt.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-call_rcu_sched/kernel/rcupreempt.c 2008-03-21 04:31:26.000000000 -0700
    @@ -46,6 +46,7 @@
    #include
    #include
    #include
    +#include
    #include
    #include
    #include
    @@ -87,9 +88,14 @@ struct rcu_data {
    struct rcu_head **nexttail;
    struct rcu_head *waitlist[GP_STAGES];
    struct rcu_head **waittail[GP_STAGES];
    - struct rcu_head *donelist;
    + struct rcu_head *donelist; /* from waitlist & waitschedlist */
    struct rcu_head **donetail;
    long rcu_flipctr[2];
    + struct rcu_head *nextschedlist;
    + struct rcu_head **nextschedtail;
    + struct rcu_head *waitschedlist;
    + struct rcu_head **waitschedtail;
    + int rcu_sched_sleeping;
    #ifdef CONFIG_RCU_TRACE
    struct rcupreempt_trace trace;
    #endif /* #ifdef CONFIG_RCU_TRACE */
    @@ -131,11 +137,24 @@ enum rcu_try_flip_states {
    rcu_try_flip_waitmb_state,
    };

    +/*
    + * States for rcu_ctrlblk.rcu_sched_sleep.
    + */
    +
    +enum rcu_sched_sleep_states {
    + rcu_sched_not_sleeping, /* Not sleeping, callbacks need GP. */
    + rcu_sched_sleep_prep, /* Thinking of sleeping, rechecking. */
    + rcu_sched_sleeping, /* Sleeping, awaken if GP needed. */
    +};
    +
    struct rcu_ctrlblk {
    spinlock_t fliplock; /* Protect state-machine transitions. */
    long completed; /* Number of last completed batch. */
    enum rcu_try_flip_states rcu_try_flip_state; /* The current state of
    the rcu state machine */
    + spinlock_t schedlock; /* Protect rcu_sched sleep state. */
    + enum rcu_sched_sleep_states sched_sleep; /* rcu_sched state. */
    + wait_queue_head_t sched_wq; /* Place for rcu_sched to sleep. */
    };

    static DEFINE_PER_CPU(struct rcu_data, rcu_data);
    @@ -143,8 +162,12 @@ static struct rcu_ctrlblk rcu_ctrlblk =
    .fliplock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.fliplock),
    .completed = 0,
    .rcu_try_flip_state = rcu_try_flip_idle_state,
    + .schedlock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.schedlock),
    + .sched_sleep = rcu_sched_not_sleeping,
    + .sched_wq = __WAIT_QUEUE_HEAD_INITIALIZER(rcu_ctrlblk.sched_wq ),
    };

    +static struct task_struct *rcu_sched_grace_period_task;

    #ifdef CONFIG_RCU_TRACE
    static char *rcu_try_flip_state_names[] =
    @@ -871,6 +894,8 @@ void rcu_offline_cpu(int cpu)
    struct rcu_head *list = NULL;
    unsigned long flags;
    struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    + struct rcu_head *schedlist = NULL;
    + struct rcu_head **schedtail = &schedlist;
    struct rcu_head **tail = &list;

    /*
    @@ -884,6 +909,11 @@ void rcu_offline_cpu(int cpu)
    rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
    list, tail);
    rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
    + rcu_offline_cpu_enqueue(rdp->waitschedlist, rdp->waitschedtail,
    + schedlist, schedtail);
    + rcu_offline_cpu_enqueue(rdp->nextschedlist, rdp->nextschedtail,
    + schedlist, schedtail);
    + rdp->rcu_sched_sleeping = 0;
    spin_unlock_irqrestore(&rdp->lock, flags);
    rdp->waitlistcount = 0;

    @@ -924,16 +954,35 @@ void rcu_offline_cpu(int cpu)
    *rdp->nexttail = list;
    if (list)
    rdp->nexttail = tail;
    + *rdp->nextschedtail = schedlist;
    + if (schedlist)
    + rdp->nextschedtail = schedtail;
    spin_unlock_irqrestore(&rdp->lock, flags);
    }

    void __devinit rcu_online_cpu(int cpu)
    {
    unsigned long flags;
    + struct rcu_data *rdp;

    spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
    cpu_set(cpu, rcu_cpu_online_map);
    spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
    +
    + /*
    + * The rcu_sched grace-period processing might have bypassed
    + * this CPU, given that it was not in the rcu_cpu_online_map
    + * when the grace-period scan started. This means that the
    + * grace-period task might sleep. So make sure that if this
    + * should happen, the first callback posted to this CPU will
    + * wake up the grace-period task if need be.
    + */
    +
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + rdp->rcu_sched_sleeping = 1;
    + spin_unlock_irqrestore(&rdp->lock, flags);
    }

    #else /* #ifdef CONFIG_HOTPLUG_CPU */
    @@ -993,26 +1042,194 @@ void call_rcu(struct rcu_head *head, voi
    }
    EXPORT_SYMBOL_GPL(call_rcu);

    +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    +{
    + unsigned long flags;
    + struct rcu_data *rdp;
    + int wake_gp = 0;
    +
    + head->func = func;
    + head->next = NULL;
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + *rdp->nextschedtail = head;
    + rdp->nextschedtail = &head->next;
    + if (rdp->rcu_sched_sleeping) {
    +
    + /* Grace-period processing might be sleeping... */
    +
    + rdp->rcu_sched_sleeping = 0;
    + wake_gp = 1;
    + }
    + spin_unlock(&rdp->lock);
    + local_irq_restore(flags);
    + if (wake_gp) {
    +
    + /* Wake up grace-period processing, unless someone beat us. */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep == rcu_sched_sleeping)
    + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    + else
    + wake_gp = 0;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + if (wake_gp)
    + wake_up(&rcu_ctrlblk.sched_wq);
    + }
    +}
    +EXPORT_SYMBOL_GPL(call_rcu_sched);
    +
    /*
    * Wait until all currently running preempt_disable() code segments
    * (including hardware-irq-disable segments) complete. Note that
    * in -rt this does -not- necessarily result in all currently executing
    * interrupt -handlers- having completed.
    */
    -void __synchronize_sched(void)
    +synchronize_rcu_xxx(__synchronize_sched, call_rcu_sched)
    +EXPORT_SYMBOL_GPL(__synchronize_sched);
    +
    +/*
    + * kthread function that manages call_rcu_sched grace periods.
    + */
    +static int
    +rcu_sched_grace_period(void *arg)
    {
    - cpumask_t oldmask;
    + int couldsleep;
    + int couldsleepnext = 0;
    int cpu;
    + unsigned long flags;
    + int needsoftirq;
    + struct rcu_data *rdp;

    - if (sched_getaffinity(0, &oldmask) < 0)
    - oldmask = cpu_possible_map;
    - for_each_online_cpu(cpu) {
    - sched_setaffinity(0, cpumask_of_cpu(cpu));
    - schedule();
    - }
    - sched_setaffinity(0, oldmask);
    + /*
    + * Each pass through the following loop handles one
    + * rcu_sched grace period cycle.
    + */
    +
    + do {
    + /*
    + * Sleep for about an RCU grace-period's worth to
    + * allow better batching and to consume less CPU.
    + */
    +
    + schedule_timeout_interruptible(HZ / 20);
    +
    + /*
    + * If there was nothing to do last time, prepare to
    + * sleep at the end of the current grace period cycle.
    + */
    +
    + couldsleep = couldsleepnext;
    + couldsleepnext = 1;
    + if (couldsleep) {
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + }
    +
    + /*
    + * Schedule on each CPU in turn, advancing callbacks
    + * as we go. We will have visited each CPU between
    + * the time we move a callback from the nextsched
    + * list and the time we move that callback to the
    + * done list. Now, a given CPU might come online
    + * during that interval, but that means that it
    + * was offline when we started, so we can safely
    + * ignore it.
    + */
    +
    + for_each_online_cpu(cpu) {
    +
    + /* Initialize and schedule onto current CPU. */
    +
    + needsoftirq = 0;
    + sched_setaffinity(0, cpumask_of_cpu(cpu)); /*@@@fail?*/
    + schedule();
    +
    + /*
    + * Get a reference to this CPU's rcu_data
    + * structure, lock it, and verify that this
    + * CPU is still online (skip it otherwise).
    + */
    +
    + rdp = RCU_DATA_CPU(cpu);
    + spin_lock_irqsave(&rdp->lock, flags);
    + if (cpu_is_offline(cpu)) {
    + spin_unlock_irqrestore(&rdp->lock, flags);
    + continue;
    + }
    +
    + /*
    + * We are running on the CPU irq-disabled, so it
    + * cannot go offline until we re-enable irqs.
    + *
    + * Advance the callbacks! We share normal RCU's
    + * donelist, since callbacks are invoked the
    + * same way in either case.
    + */
    +
    + if (rdp->waitschedlist != NULL) {
    + *rdp->donetail = rdp->waitschedlist;
    + rdp->donetail = rdp->waitschedtail;
    + needsoftirq = 1;
    + }
    + if (rdp->nextschedlist != NULL) {
    + rdp->waitschedlist = rdp->nextschedlist;
    + rdp->waitschedtail = rdp->nextschedtail;
    + couldsleepnext = couldsleep = 0;
    + } else {
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + }
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    +
    + /* Mark sleep intention. */
    +
    + rdp->rcu_sched_sleeping = couldsleep;
    +
    + spin_unlock_irqrestore(&rdp->lock, flags);
    +
    + /* If we added callbacks to donelist, process. */
    +
    + if (needsoftirq)
    + raise_softirq(RCU_SOFTIRQ);
    + }
    +
    + /* If we saw callbacks on the last scan, go deal with them. */
    +
    + if (!couldsleep) {
    + continue;
    + }
    +
    + /* Attempt to block... */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    +
    + /*
    + * Someone posted a callback after we scanned.
    + * Go take care of it.
    + */
    +
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + couldsleepnext = 0;
    + continue;
    + }
    +
    + /* Block until the next person posts a callback. */
    +
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + __wait_event(rcu_ctrlblk.sched_wq,
    + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    + couldsleepnext = 0;
    +
    + } while (!kthread_should_stop());
    +
    + return (0);
    }
    -EXPORT_SYMBOL_GPL(__synchronize_sched);

    /*
    * Check to see if any future RCU-related work will need to be done
    @@ -1107,6 +1324,11 @@ void __init __rcu_init(void)
    rdp->donetail = &rdp->donelist;
    rdp->rcu_flipctr[0] = 0;
    rdp->rcu_flipctr[1] = 0;
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + rdp->rcu_sched_sleeping = 0;
    }
    register_cpu_notifier(&rcu_nb);

    @@ -1129,6 +1351,18 @@ void __init __rcu_init(void)
    }

    /*
    + * Late-boot-time RCU initialization that must wait until after scheduler
    + * has been initialized.
    + */
    +void __init rcu_init_sched(void)
    +{
    + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    + NULL,
    + "rcu_sched_grace_period");
    + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    +}
    +
    +/*
    * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    */
    void synchronize_kernel(void)
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  2. [PATCH,RFC] Add call_rcu_sched()

    Hello!

    Second cut of patch to provide the call_rcu_sched() needed for Mathieu's
    markers implementation. This is again to synchronize_sched() as
    call_rcu() is to synchronize_rcu().

    Should be fine for experimental use, but not ready for inclusion.

    Passes short rcutorture sessions, but should be treated with some caution
    given that very little of it is more than 24 hours old. Fixes since the
    first version include a bug that could result in indefinite blocking
    (spotted by Gautham Shenoy), better resiliency against CPU-hotplug
    operations, and other minor fixes.

    Known/suspected shortcomings:

    o Only moderately tested -- only short rcutorture sessions.

    o Need to add call_rcu_sched() testing to rcutorture.

    o If I remember correctly, an rcu_barrier_sched() is required
    (Mathieu?).

    o Interaction of this patch with CPU hotplug should be viewed
    with great suspicion.

    o If there are no synchronize_sched() calls for more than two
    minutes, one can see messages of the form "INFO: task
    rcu_sched_grace:924 blocked for more than 120 seconds."
    Any thoughts on how to avoid this message? Should I be using
    something other than __wait_event() and wake_up(), which sleep
    uninterruptibly, thus triggering this message?

    One other thing -- this patch also fixes a long-standing bug in the
    earlier preemptable-RCU implementation of synchronize_rcu() that could
    result in loss of concurrent external changes to a task's CPU affinity
    mask. I have lost track of who reported this...

    Signed-off-by: Paul E. McKenney
    ---

    include/linux/rcuclassic.h | 3
    include/linux/rcupdate.h | 22 +++
    include/linux/rcupreempt.h | 15 ++
    init/main.c | 1
    kernel/rcupdate.c | 20 ---
    kernel/rcupreempt.c | 276 +++++++++++++++++++++++++++++++++++++++++++--
    6 files changed, 308 insertions(+), 29 deletions(-)

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcuclassic.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h
    --- linux-2.6.25-rc6/include/linux/rcuclassic.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h 2008-03-21 04:27:31.000000000 -0700
    @@ -153,7 +153,10 @@ extern struct lockdep_map rcu_lock_map;

    #define __synchronize_sched() synchronize_rcu()

    +#define call_rcu_sched(head, func) call_rcu(head, func)
    +
    extern void __rcu_init(void);
    +#define rcu_init_sched() do { } while (0)
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupdate.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h
    --- linux-2.6.25-rc6/include/linux/rcupdate.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h 2008-03-20 21:10:42.000000000 -0700
    @@ -42,6 +42,7 @@
    #include
    #include
    #include
    +#include

    /**
    * struct rcu_head - callback structure for use with RCU
    @@ -182,6 +183,27 @@ struct rcu_head {
    (p) = (v); \
    })

    +/* Infrastructure to implement the synchronize_() primitives. */
    +
    +struct rcu_synchronize {
    + struct rcu_head head;
    + struct completion completion;
    +};
    +
    +extern void wakeme_after_rcu(struct rcu_head *head);
    +
    +#define synchronize_rcu_xxx(name, func) \
    +void name(void) \
    +{ \
    + struct rcu_synchronize rcu; \
    + \
    + init_completion(&rcu.completion); \
    + /* Will wake me after RCU finished. */ \
    + func(&rcu.head, wakeme_after_rcu); \
    + /* Wait for it. */ \
    + wait_for_completion(&rcu.completion); \
    +}
    +
    /**
    * synchronize_sched - block until all CPUs have exited any non-preemptive
    * kernel code sequences.
    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupreempt.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h
    --- linux-2.6.25-rc6/include/linux/rcupreempt.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h 2008-03-21 04:31:29.000000000 -0700
    @@ -46,6 +46,20 @@
    #define rcu_bh_qsctr_inc(cpu)
    #define call_rcu_bh(head, rcu) call_rcu(head, rcu)

    +/**
    + * call_rcu_sched - Queue RCU callback for invocation after sched grace period.
    + * @head: structure to be used for queueing the RCU updates.
    + * @func: actual update function to be invoked after the grace period
    + *
    + * The update function will be invoked some time after a full
    + * synchronize_sched()-style grace period elapses, in other words after
    + * all currently executing preempt-disabled sections of code (including
    + * hardirq handlers, NMI handlers, and local_irq_save() blocks) have
    + * completed.
    + */
    +extern void call_rcu_sched(struct rcu_head *head,
    + void (*func)(struct rcu_head *head));
    +
    extern void __rcu_read_lock(void) __acquires(RCU);
    extern void __rcu_read_unlock(void) __releases(RCU);
    extern int rcu_pending(int cpu);
    @@ -57,6 +71,7 @@ extern int rcu_needs_cpu(int cpu);
    extern void __synchronize_sched(void);

    extern void __rcu_init(void);
    +extern void rcu_init_sched(void);
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);
    extern long rcu_batches_completed(void);
    diff -urpNa -X dontdiff linux-2.6.25-rc6/init/main.c linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c
    --- linux-2.6.25-rc6/init/main.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c 2008-03-21 04:31:31.000000000 -0700
    @@ -736,6 +736,7 @@ static void __init do_basic_setup(void)
    driver_init();
    init_irq_proc();
    do_initcalls();
    + rcu_init_sched();
    }

    static int __initdata nosoftlockup;
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupdate.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c
    --- linux-2.6.25-rc6/kernel/rcupdate.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c 2008-03-20 21:10:39.000000000 -0700
    @@ -39,18 +39,12 @@
    #include
    #include
    #include
    -#include
    #include
    #include
    #include
    #include
    #include

    -struct rcu_synchronize {
    - struct rcu_head head;
    - struct completion completion;
    -};
    -
    static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    static atomic_t rcu_barrier_cpu_count;
    static DEFINE_MUTEX(rcu_barrier_mutex);
    @@ -60,7 +54,7 @@ static struct completion rcu_barrier_com
    * Awaken the corresponding synchronize_rcu() instance now that a
    * grace period has elapsed.
    */
    -static void wakeme_after_rcu(struct rcu_head *head)
    +void wakeme_after_rcu(struct rcu_head *head)
    {
    struct rcu_synchronize *rcu;

    @@ -77,17 +71,7 @@ static void wakeme_after_rcu(struct rcu_
    * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
    * and may be nested.
    */
    -void synchronize_rcu(void)
    -{
    - struct rcu_synchronize rcu;
    -
    - init_completion(&rcu.completion);
    - /* Will wake me after RCU finished */
    - call_rcu(&rcu.head, wakeme_after_rcu);
    -
    - /* Wait for it */
    - wait_for_completion(&rcu.completion);
    -}
    +synchronize_rcu_xxx(synchronize_rcu, call_rcu)
    EXPORT_SYMBOL_GPL(synchronize_rcu);

    static void rcu_barrier_callback(struct rcu_head *notused)
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupreempt.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c
    --- linux-2.6.25-rc6/kernel/rcupreempt.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c 2008-03-21 12:44:24.000000000 -0700
    @@ -46,6 +46,7 @@
    #include
    #include
    #include
    +#include
    #include
    #include
    #include
    @@ -87,9 +88,14 @@ struct rcu_data {
    struct rcu_head **nexttail;
    struct rcu_head *waitlist[GP_STAGES];
    struct rcu_head **waittail[GP_STAGES];
    - struct rcu_head *donelist;
    + struct rcu_head *donelist; /* from waitlist & waitschedlist */
    struct rcu_head **donetail;
    long rcu_flipctr[2];
    + struct rcu_head *nextschedlist;
    + struct rcu_head **nextschedtail;
    + struct rcu_head *waitschedlist;
    + struct rcu_head **waitschedtail;
    + int rcu_sched_sleeping;
    #ifdef CONFIG_RCU_TRACE
    struct rcupreempt_trace trace;
    #endif /* #ifdef CONFIG_RCU_TRACE */
    @@ -131,11 +137,24 @@ enum rcu_try_flip_states {
    rcu_try_flip_waitmb_state,
    };

    +/*
    + * States for rcu_ctrlblk.rcu_sched_sleep.
    + */
    +
    +enum rcu_sched_sleep_states {
    + rcu_sched_not_sleeping, /* Not sleeping, callbacks need GP. */
    + rcu_sched_sleep_prep, /* Thinking of sleeping, rechecking. */
    + rcu_sched_sleeping, /* Sleeping, awaken if GP needed. */
    +};
    +
    struct rcu_ctrlblk {
    spinlock_t fliplock; /* Protect state-machine transitions. */
    long completed; /* Number of last completed batch. */
    enum rcu_try_flip_states rcu_try_flip_state; /* The current state of
    the rcu state machine */
    + spinlock_t schedlock; /* Protect rcu_sched sleep state. */
    + enum rcu_sched_sleep_states sched_sleep; /* rcu_sched state. */
    + wait_queue_head_t sched_wq; /* Place for rcu_sched to sleep. */
    };

    static DEFINE_PER_CPU(struct rcu_data, rcu_data);
    @@ -143,8 +162,12 @@ static struct rcu_ctrlblk rcu_ctrlblk =
    .fliplock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.fliplock),
    .completed = 0,
    .rcu_try_flip_state = rcu_try_flip_idle_state,
    + .schedlock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.schedlock),
    + .sched_sleep = rcu_sched_not_sleeping,
    + .sched_wq = __WAIT_QUEUE_HEAD_INITIALIZER(rcu_ctrlblk.sched_wq ),
    };

    +static struct task_struct *rcu_sched_grace_period_task;

    #ifdef CONFIG_RCU_TRACE
    static char *rcu_try_flip_state_names[] =
    @@ -871,6 +894,8 @@ void rcu_offline_cpu(int cpu)
    struct rcu_head *list = NULL;
    unsigned long flags;
    struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    + struct rcu_head *schedlist = NULL;
    + struct rcu_head **schedtail = &schedlist;
    struct rcu_head **tail = &list;

    /*
    @@ -884,6 +909,11 @@ void rcu_offline_cpu(int cpu)
    rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
    list, tail);
    rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
    + rcu_offline_cpu_enqueue(rdp->waitschedlist, rdp->waitschedtail,
    + schedlist, schedtail);
    + rcu_offline_cpu_enqueue(rdp->nextschedlist, rdp->nextschedtail,
    + schedlist, schedtail);
    + rdp->rcu_sched_sleeping = 0;
    spin_unlock_irqrestore(&rdp->lock, flags);
    rdp->waitlistcount = 0;

    @@ -924,16 +954,35 @@ void rcu_offline_cpu(int cpu)
    *rdp->nexttail = list;
    if (list)
    rdp->nexttail = tail;
    + *rdp->nextschedtail = schedlist;
    + if (schedlist)
    + rdp->nextschedtail = schedtail;
    spin_unlock_irqrestore(&rdp->lock, flags);
    }

    void __devinit rcu_online_cpu(int cpu)
    {
    unsigned long flags;
    + struct rcu_data *rdp;

    spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
    cpu_set(cpu, rcu_cpu_online_map);
    spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
    +
    + /*
    + * The rcu_sched grace-period processing might have bypassed
    + * this CPU, given that it was not in the rcu_cpu_online_map
    + * when the grace-period scan started. This means that the
    + * grace-period task might sleep. So make sure that if this
    + * should happen, the first callback posted to this CPU will
    + * wake up the grace-period task if need be.
    + */
    +
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + rdp->rcu_sched_sleeping = 1;
    + spin_unlock_irqrestore(&rdp->lock, flags);
    }

    #else /* #ifdef CONFIG_HOTPLUG_CPU */
    @@ -993,26 +1042,214 @@ void call_rcu(struct rcu_head *head, voi
    }
    EXPORT_SYMBOL_GPL(call_rcu);

    +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    +{
    + unsigned long flags;
    + struct rcu_data *rdp;
    + int wake_gp = 0;
    +
    + head->func = func;
    + head->next = NULL;
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + *rdp->nextschedtail = head;
    + rdp->nextschedtail = &head->next;
    + if (rdp->rcu_sched_sleeping) {
    +
    + /* Grace-period processing might be sleeping... */
    +
    + rdp->rcu_sched_sleeping = 0;
    + wake_gp = 1;
    + }
    + spin_unlock(&rdp->lock);
    + local_irq_restore(flags);
    + if (wake_gp) {
    +
    + /* Wake up grace-period processing, unless someone beat us. */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    + wake_gp = 0;
    + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + if (wake_gp)
    + wake_up(&rcu_ctrlblk.sched_wq);
    + }
    +}
    +EXPORT_SYMBOL_GPL(call_rcu_sched);
    +
    /*
    * Wait until all currently running preempt_disable() code segments
    * (including hardware-irq-disable segments) complete. Note that
    * in -rt this does -not- necessarily result in all currently executing
    * interrupt -handlers- having completed.
    */
    -void __synchronize_sched(void)
    +synchronize_rcu_xxx(__synchronize_sched, call_rcu_sched)
    +EXPORT_SYMBOL_GPL(__synchronize_sched);
    +
    +/*
    + * kthread function that manages call_rcu_sched grace periods.
    + */
    +static int
    +rcu_sched_grace_period(void *arg)
    {
    - cpumask_t oldmask;
    + int couldsleep; /* might sleep after current pass. */
    + int couldsleepnext = 0; /* might sleep after next pass. */
    int cpu;
    + long err;
    + unsigned long flags;
    + int needsoftirq;
    + struct rcu_data *rdp;

    - if (sched_getaffinity(0, &oldmask) < 0)
    - oldmask = cpu_possible_map;
    - for_each_online_cpu(cpu) {
    - sched_setaffinity(0, cpumask_of_cpu(cpu));
    - schedule();
    - }
    - sched_setaffinity(0, oldmask);
    + /*
    + * Each pass through the following loop handles one
    + * rcu_sched grace period cycle.
    + */
    +
    + do {
    +
    + /*
    + * Sleep for about an RCU grace-period's worth to
    + * allow better batching and to consume less CPU.
    + */
    +
    + schedule_timeout_interruptible(HZ / 20);
    +
    + /*
    + * If there was nothing to do last time, prepare to
    + * sleep at the end of the current grace period cycle.
    + */
    +
    + couldsleep = couldsleepnext;
    + couldsleepnext = 1;
    + if (couldsleep) {
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + }
    +
    + /*
    + * Schedule on each CPU in turn, advancing callbacks
    + * as we go. We will have visited each CPU between
    + * the time we move a callback from the nextsched
    + * list and the time we move that callback to the
    + * done list. Now, a given CPU might come online
    + * during that interval, but that means that it
    + * was offline when we started, so we can safely
    + * ignore it.
    + */
    +
    + for_each_online_cpu(cpu) {
    +
    +retry:
    +
    + /* Initialize and schedule onto current CPU. */
    +
    + needsoftirq = 0;
    + err = sched_setaffinity(0, cpumask_of_cpu(cpu));
    + if (err < 0) {
    + printk(KERN_WARNING "sched_setaffinity(%d) error: %ld, cpu_is_offline: %ld\n", cpu, err, cpu_is_offline(cpu));
    + schedule_timeout_interruptible(HZ);
    + continue;
    + }
    +
    + /*
    + * Get a reference to this CPU's rcu_data
    + * structure, lock it, and verify that this
    + * CPU is still online (skip it otherwise).
    + */
    +
    + rdp = RCU_DATA_CPU(cpu);
    + spin_lock_irqsave(&rdp->lock, flags);
    + if (cpu_is_offline(cpu)) {
    + spin_unlock_irqrestore(&rdp->lock, flags);
    + continue;
    + }
    +
    + /*
    + * If we didn't end up on the CPU we expected
    + * to, try again. This can happen if a CPU
    + * goes offline before we attempt to schedule
    + * on it, but comes back online before we get
    + * to this check.
    + */
    +
    + if (smp_processor_id() != cpu) {
    + spin_unlock_irqrestore(&rdp->lock, flags);
    + goto retry;
    + }
    +
    + /*
    + * We are running on the CPU irq-disabled, so it
    + * cannot go offline until we re-enable irqs.
    + *
    + * Advance the callbacks! We share normal RCU's
    + * donelist, since callbacks are invoked the
    + * same way in either case.
    + */
    +
    + if (rdp->waitschedlist != NULL) {
    + *rdp->donetail = rdp->waitschedlist;
    + rdp->donetail = rdp->waitschedtail;
    + needsoftirq = 1;
    + }
    + if (rdp->nextschedlist != NULL) {
    + rdp->waitschedlist = rdp->nextschedlist;
    + rdp->waitschedtail = rdp->nextschedtail;
    + couldsleep = 0;
    + couldsleepnext = 0;
    + } else {
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + }
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    +
    + /* Mark sleep intention. */
    +
    + rdp->rcu_sched_sleeping = couldsleep;
    +
    + spin_unlock_irqrestore(&rdp->lock, flags);
    +
    + /* If we added callbacks to donelist, process. */
    +
    + if (needsoftirq)
    + raise_softirq(RCU_SOFTIRQ);
    + }
    +
    + /* If we saw callbacks on the last scan, go deal with them. */
    +
    + if (!couldsleep)
    + continue;
    +
    + /* Attempt to block... */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    +
    + /*
    + * Someone posted a callback after we scanned.
    + * Go take care of it.
    + */
    +
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + couldsleepnext = 0;
    + continue;
    + }
    +
    + /* Block until the next person posts a callback. */
    +
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + __wait_event(rcu_ctrlblk.sched_wq,
    + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    + couldsleepnext = 0;
    +
    + } while (!kthread_should_stop());
    +
    + return (0);
    }
    -EXPORT_SYMBOL_GPL(__synchronize_sched);

    /*
    * Check to see if any future RCU-related work will need to be done
    @@ -1107,6 +1344,11 @@ void __init __rcu_init(void)
    rdp->donetail = &rdp->donelist;
    rdp->rcu_flipctr[0] = 0;
    rdp->rcu_flipctr[1] = 0;
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + rdp->rcu_sched_sleeping = 0;
    }
    register_cpu_notifier(&rcu_nb);

    @@ -1129,6 +1371,18 @@ void __init __rcu_init(void)
    }

    /*
    + * Late-boot-time RCU initialization that must wait until after scheduler
    + * has been initialized.
    + */
    +void __init rcu_init_sched(void)
    +{
    + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    + NULL,
    + "rcu_sched_grace_period");
    + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    +}
    +
    +/*
    * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    */
    void synchronize_kernel(void)
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  3. Re: [PATCH,RFC] Add call_rcu_sched()

    * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
    > Hello!
    >
    > Second cut of patch to provide the call_rcu_sched() needed for Mathieu's
    > markers implementation. This is again to synchronize_sched() as
    > call_rcu() is to synchronize_rcu().
    >
    > Should be fine for experimental use, but not ready for inclusion.
    >
    > Passes short rcutorture sessions, but should be treated with some caution
    > given that very little of it is more than 24 hours old. Fixes since the
    > first version include a bug that could result in indefinite blocking
    > (spotted by Gautham Shenoy), better resiliency against CPU-hotplug
    > operations, and other minor fixes.
    >
    > Known/suspected shortcomings:
    >
    > o Only moderately tested -- only short rcutorture sessions.
    >
    > o Need to add call_rcu_sched() testing to rcutorture.
    >
    > o If I remember correctly, an rcu_barrier_sched() is required
    > (Mathieu?).
    >


    Hi Paul,

    Thanks for this work, I'll give it a try (I'm just back from a weekend
    away from the city). Yes, my code needs a rcu_barrier_sched() so it can
    wait for call_rcu_sched completion before it tries to re-use the data
    structures at the next modification of the same marker.

    I think rcu_barrier_sched should be quite straightforward to implement
    if we derive it from kernel/rcupdate.c:rcu_barrier. Actually, couldn't
    we just rename rcu_barrier into something else made static (_rcu_barrier)
    and call it with a different parameter telling which of call_rcu or
    call_rcu_sched to use ?

    Something like this :

    Add rcu_barrier_sched

    Adds rcu_barrier_sched, which uses call_rcu_sched. It wait for each in flight
    call_rcu_sched to be completed before it returns.

    Signed-off-by: Mathieu Desnoyers
    ---
    include/linux/rcupdate.h | 1 +
    kernel/rcupdate.c | 40 +++++++++++++++++++++++++++++++++-------
    2 files changed, 34 insertions(+), 7 deletions(-)

    Index: linux-2.6-lttng/include/linux/rcupdate.h
    ================================================== =================
    --- linux-2.6-lttng.orig/include/linux/rcupdate.h 2008-03-24 00:13:26.000000000 -0400
    +++ linux-2.6-lttng/include/linux/rcupdate.h 2008-03-24 00:13:36.000000000 -0400
    @@ -260,6 +260,7 @@ extern void call_rcu_bh(struct rcu_head
    /* Exported common interfaces */
    extern void synchronize_rcu(void);
    extern void rcu_barrier(void);
    +extern void rcu_barrier_sched(void);
    extern long rcu_batches_completed(void);
    extern long rcu_batches_completed_bh(void);

    Index: linux-2.6-lttng/kernel/rcupdate.c
    ================================================== =================
    --- linux-2.6-lttng.orig/kernel/rcupdate.c 2008-03-24 00:07:15.000000000 -0400
    +++ linux-2.6-lttng/kernel/rcupdate.c 2008-03-24 00:17:01.000000000 -0400
    @@ -45,6 +45,11 @@
    #include
    #include

    +enum rcu_barrier {
    + RCU_BARRIER_STD,
    + RCU_BARRIER_SCHED,
    +};
    +
    static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    static atomic_t rcu_barrier_cpu_count;
    static DEFINE_MUTEX(rcu_barrier_mutex);
    @@ -83,19 +88,23 @@ static void rcu_barrier_callback(struct
    /*
    * Called with preemption disabled, and from cross-cpu IRQ context.
    */
    -static void rcu_barrier_func(void *notused)
    +static void rcu_barrier_func(void *type)
    {
    int cpu = smp_processor_id();
    struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);

    atomic_inc(&rcu_barrier_cpu_count);
    - call_rcu(head, rcu_barrier_callback);
    + switch((enum rcu_barrier)type) {
    + case RCU_BARRIER_STD:
    + call_rcu(head, rcu_barrier_callback);
    + break;
    + case RCU_BARRIER_SCHED:
    + call_rcu_sched(head, rcu_barrier_callback);
    + break;
    + }
    }

    -/**
    - * rcu_barrier - Wait until all the in-flight RCUs are complete.
    - */
    -void rcu_barrier(void)
    +static void _rcu_barrier(enum rcu_barrier type)
    {
    BUG_ON(in_interrupt());
    /* Take cpucontrol mutex to protect against CPU hotplug */
    @@ -111,13 +120,30 @@ void rcu_barrier(void)
    * until all the callbacks are queued.
    */
    rcu_read_lock();
    - on_each_cpu(rcu_barrier_func, NULL, 0, 1);
    + on_each_cpu(rcu_barrier_func, (void *)type, 0, 1);
    rcu_read_unlock();
    wait_for_completion(&rcu_barrier_completion);
    mutex_unlock(&rcu_barrier_mutex);
    }
    +
    +/**
    + * rcu_barrier - Wait until all the in-flight RCUs are complete.
    + */
    +void rcu_barrier(void)
    +{
    + _rcu_barrier(RCU_BARRIER_STD);
    +}
    EXPORT_SYMBOL_GPL(rcu_barrier);

    +/**
    + * rcu_barrier_sched - Wait until all the in-flight call_rcu_sched are complete.
    + */
    +void rcu_barrier_sched(void)
    +{
    + _rcu_barrier(RCU_BARRIER_SCHED);
    +}
    +EXPORT_SYMBOL_GPL(rcu_barrier_sched);
    +
    void __init rcu_init(void)
    {
    __rcu_init();


    > o Interaction of this patch with CPU hotplug should be viewed
    > with great suspicion.
    >


    Fix call_rcu_sched wait

    > o If there are no synchronize_sched() calls for more than two
    > minutes, one can see messages of the form "INFO: task
    > rcu_sched_grace:924 blocked for more than 120 seconds."
    > Any thoughts on how to avoid this message? Should I be using
    > something other than __wait_event() and wake_up(), which sleep
    > uninterruptibly, thus triggering this message?
    >


    Could you use __wait_event_interruptible and wake_up_interruptible
    instead ? softlockup.c only seems to complain when uninterruptible tasks
    are not scheduled for 2 minutes. I guess that when we receive a signal
    we could simply go through another loop.

    Signed-off-by: Mathieu Desnoyers
    ---
    kernel/rcupreempt.c | 9 ++++++---
    1 file changed, 6 insertions(+), 3 deletions(-)

    Index: linux-2.6-lttng/kernel/rcupreempt.c
    ================================================== =================
    --- linux-2.6-lttng.orig/kernel/rcupreempt.c 2008-03-24 00:26:27.000000000 -0400
    +++ linux-2.6-lttng/kernel/rcupreempt.c 2008-03-24 00:33:47.000000000 -0400
    @@ -1074,7 +1074,7 @@ void call_rcu_sched(struct rcu_head *hea
    rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    if (wake_gp)
    - wake_up(&rcu_ctrlblk.sched_wq);
    + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    }
    }
    EXPORT_SYMBOL_GPL(call_rcu_sched);
    @@ -1097,6 +1097,7 @@ rcu_sched_grace_period(void *arg)
    int couldsleep; /* might sleep after current pass. */
    int couldsleepnext = 0; /* might sleep after next pass. */
    int cpu;
    + int ret;
    long err;
    unsigned long flags;
    int needsoftirq;
    @@ -1242,8 +1243,10 @@ retry:

    rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    - __wait_event(rcu_ctrlblk.sched_wq,
    - rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    + ret = 0;
    + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    + ret);
    couldsleepnext = 0;

    } while (!kthread_should_stop());

    > One other thing -- this patch also fixes a long-standing bug in the
    > earlier preemptable-RCU implementation of synchronize_rcu() that could
    > result in loss of concurrent external changes to a task's CPU affinity
    > mask. I have lost track of who reported this...
    >


    That's always good

    Mathieu

    > Signed-off-by: Paul E. McKenney
    > ---
    >
    > include/linux/rcuclassic.h | 3
    > include/linux/rcupdate.h | 22 +++
    > include/linux/rcupreempt.h | 15 ++
    > init/main.c | 1
    > kernel/rcupdate.c | 20 ---
    > kernel/rcupreempt.c | 276 +++++++++++++++++++++++++++++++++++++++++++--
    > 6 files changed, 308 insertions(+), 29 deletions(-)
    >
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcuclassic.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h
    > --- linux-2.6.25-rc6/include/linux/rcuclassic.h 2008-03-16 17:45:16.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h 2008-03-21 04:27:31.000000000 -0700
    > @@ -153,7 +153,10 @@ extern struct lockdep_map rcu_lock_map;
    >
    > #define __synchronize_sched() synchronize_rcu()
    >
    > +#define call_rcu_sched(head, func) call_rcu(head, func)
    > +
    > extern void __rcu_init(void);
    > +#define rcu_init_sched() do { } while (0)
    > extern void rcu_check_callbacks(int cpu, int user);
    > extern void rcu_restart_cpu(int cpu);
    >
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupdate.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h
    > --- linux-2.6.25-rc6/include/linux/rcupdate.h 2008-03-16 17:45:16.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h 2008-03-20 21:10:42.000000000 -0700
    > @@ -42,6 +42,7 @@
    > #include
    > #include
    > #include
    > +#include
    >
    > /**
    > * struct rcu_head - callback structure for use with RCU
    > @@ -182,6 +183,27 @@ struct rcu_head {
    > (p) = (v); \
    > })
    >
    > +/* Infrastructure to implement the synchronize_() primitives. */
    > +
    > +struct rcu_synchronize {
    > + struct rcu_head head;
    > + struct completion completion;
    > +};
    > +
    > +extern void wakeme_after_rcu(struct rcu_head *head);
    > +
    > +#define synchronize_rcu_xxx(name, func) \
    > +void name(void) \
    > +{ \
    > + struct rcu_synchronize rcu; \
    > + \
    > + init_completion(&rcu.completion); \
    > + /* Will wake me after RCU finished. */ \
    > + func(&rcu.head, wakeme_after_rcu); \
    > + /* Wait for it. */ \
    > + wait_for_completion(&rcu.completion); \
    > +}
    > +
    > /**
    > * synchronize_sched - block until all CPUs have exited any non-preemptive
    > * kernel code sequences.
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupreempt.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h
    > --- linux-2.6.25-rc6/include/linux/rcupreempt.h 2008-03-16 17:45:16.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h 2008-03-21 04:31:29.000000000 -0700
    > @@ -46,6 +46,20 @@
    > #define rcu_bh_qsctr_inc(cpu)
    > #define call_rcu_bh(head, rcu) call_rcu(head, rcu)
    >
    > +/**
    > + * call_rcu_sched - Queue RCU callback for invocation after sched grace period.
    > + * @head: structure to be used for queueing the RCU updates.
    > + * @func: actual update function to be invoked after the grace period
    > + *
    > + * The update function will be invoked some time after a full
    > + * synchronize_sched()-style grace period elapses, in other words after
    > + * all currently executing preempt-disabled sections of code (including
    > + * hardirq handlers, NMI handlers, and local_irq_save() blocks) have
    > + * completed.
    > + */
    > +extern void call_rcu_sched(struct rcu_head *head,
    > + void (*func)(struct rcu_head *head));
    > +
    > extern void __rcu_read_lock(void) __acquires(RCU);
    > extern void __rcu_read_unlock(void) __releases(RCU);
    > extern int rcu_pending(int cpu);
    > @@ -57,6 +71,7 @@ extern int rcu_needs_cpu(int cpu);
    > extern void __synchronize_sched(void);
    >
    > extern void __rcu_init(void);
    > +extern void rcu_init_sched(void);
    > extern void rcu_check_callbacks(int cpu, int user);
    > extern void rcu_restart_cpu(int cpu);
    > extern long rcu_batches_completed(void);
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/init/main.c linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c
    > --- linux-2.6.25-rc6/init/main.c 2008-03-16 17:45:17.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c 2008-03-21 04:31:31.000000000 -0700
    > @@ -736,6 +736,7 @@ static void __init do_basic_setup(void)
    > driver_init();
    > init_irq_proc();
    > do_initcalls();
    > + rcu_init_sched();
    > }
    >
    > static int __initdata nosoftlockup;
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupdate.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c
    > --- linux-2.6.25-rc6/kernel/rcupdate.c 2008-03-16 17:45:17.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c 2008-03-20 21:10:39.000000000 -0700
    > @@ -39,18 +39,12 @@
    > #include
    > #include
    > #include
    > -#include
    > #include
    > #include
    > #include
    > #include
    > #include
    >
    > -struct rcu_synchronize {
    > - struct rcu_head head;
    > - struct completion completion;
    > -};
    > -
    > static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    > static atomic_t rcu_barrier_cpu_count;
    > static DEFINE_MUTEX(rcu_barrier_mutex);
    > @@ -60,7 +54,7 @@ static struct completion rcu_barrier_com
    > * Awaken the corresponding synchronize_rcu() instance now that a
    > * grace period has elapsed.
    > */
    > -static void wakeme_after_rcu(struct rcu_head *head)
    > +void wakeme_after_rcu(struct rcu_head *head)
    > {
    > struct rcu_synchronize *rcu;
    >
    > @@ -77,17 +71,7 @@ static void wakeme_after_rcu(struct rcu_
    > * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
    > * and may be nested.
    > */
    > -void synchronize_rcu(void)
    > -{
    > - struct rcu_synchronize rcu;
    > -
    > - init_completion(&rcu.completion);
    > - /* Will wake me after RCU finished */
    > - call_rcu(&rcu.head, wakeme_after_rcu);
    > -
    > - /* Wait for it */
    > - wait_for_completion(&rcu.completion);
    > -}
    > +synchronize_rcu_xxx(synchronize_rcu, call_rcu)
    > EXPORT_SYMBOL_GPL(synchronize_rcu);
    >
    > static void rcu_barrier_callback(struct rcu_head *notused)
    > diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupreempt.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c
    > --- linux-2.6.25-rc6/kernel/rcupreempt.c 2008-03-16 17:45:17.000000000 -0700
    > +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c 2008-03-21 12:44:24.000000000 -0700
    > @@ -46,6 +46,7 @@
    > #include
    > #include
    > #include
    > +#include
    > #include
    > #include
    > #include
    > @@ -87,9 +88,14 @@ struct rcu_data {
    > struct rcu_head **nexttail;
    > struct rcu_head *waitlist[GP_STAGES];
    > struct rcu_head **waittail[GP_STAGES];
    > - struct rcu_head *donelist;
    > + struct rcu_head *donelist; /* from waitlist & waitschedlist */
    > struct rcu_head **donetail;
    > long rcu_flipctr[2];
    > + struct rcu_head *nextschedlist;
    > + struct rcu_head **nextschedtail;
    > + struct rcu_head *waitschedlist;
    > + struct rcu_head **waitschedtail;
    > + int rcu_sched_sleeping;
    > #ifdef CONFIG_RCU_TRACE
    > struct rcupreempt_trace trace;
    > #endif /* #ifdef CONFIG_RCU_TRACE */
    > @@ -131,11 +137,24 @@ enum rcu_try_flip_states {
    > rcu_try_flip_waitmb_state,
    > };
    >
    > +/*
    > + * States for rcu_ctrlblk.rcu_sched_sleep.
    > + */
    > +
    > +enum rcu_sched_sleep_states {
    > + rcu_sched_not_sleeping, /* Not sleeping, callbacks need GP. */
    > + rcu_sched_sleep_prep, /* Thinking of sleeping, rechecking. */
    > + rcu_sched_sleeping, /* Sleeping, awaken if GP needed. */
    > +};
    > +
    > struct rcu_ctrlblk {
    > spinlock_t fliplock; /* Protect state-machine transitions. */
    > long completed; /* Number of last completed batch. */
    > enum rcu_try_flip_states rcu_try_flip_state; /* The current state of
    > the rcu state machine */
    > + spinlock_t schedlock; /* Protect rcu_sched sleep state. */
    > + enum rcu_sched_sleep_states sched_sleep; /* rcu_sched state. */
    > + wait_queue_head_t sched_wq; /* Place for rcu_sched to sleep. */
    > };
    >
    > static DEFINE_PER_CPU(struct rcu_data, rcu_data);
    > @@ -143,8 +162,12 @@ static struct rcu_ctrlblk rcu_ctrlblk =
    > .fliplock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.fliplock),
    > .completed = 0,
    > .rcu_try_flip_state = rcu_try_flip_idle_state,
    > + .schedlock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.schedlock),
    > + .sched_sleep = rcu_sched_not_sleeping,
    > + .sched_wq = __WAIT_QUEUE_HEAD_INITIALIZER(rcu_ctrlblk.sched_wq ),
    > };
    >
    > +static struct task_struct *rcu_sched_grace_period_task;
    >
    > #ifdef CONFIG_RCU_TRACE
    > static char *rcu_try_flip_state_names[] =
    > @@ -871,6 +894,8 @@ void rcu_offline_cpu(int cpu)
    > struct rcu_head *list = NULL;
    > unsigned long flags;
    > struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    > + struct rcu_head *schedlist = NULL;
    > + struct rcu_head **schedtail = &schedlist;
    > struct rcu_head **tail = &list;
    >
    > /*
    > @@ -884,6 +909,11 @@ void rcu_offline_cpu(int cpu)
    > rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
    > list, tail);
    > rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
    > + rcu_offline_cpu_enqueue(rdp->waitschedlist, rdp->waitschedtail,
    > + schedlist, schedtail);
    > + rcu_offline_cpu_enqueue(rdp->nextschedlist, rdp->nextschedtail,
    > + schedlist, schedtail);
    > + rdp->rcu_sched_sleeping = 0;
    > spin_unlock_irqrestore(&rdp->lock, flags);
    > rdp->waitlistcount = 0;
    >
    > @@ -924,16 +954,35 @@ void rcu_offline_cpu(int cpu)
    > *rdp->nexttail = list;
    > if (list)
    > rdp->nexttail = tail;
    > + *rdp->nextschedtail = schedlist;
    > + if (schedlist)
    > + rdp->nextschedtail = schedtail;
    > spin_unlock_irqrestore(&rdp->lock, flags);
    > }
    >
    > void __devinit rcu_online_cpu(int cpu)
    > {
    > unsigned long flags;
    > + struct rcu_data *rdp;
    >
    > spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
    > cpu_set(cpu, rcu_cpu_online_map);
    > spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
    > +
    > + /*
    > + * The rcu_sched grace-period processing might have bypassed
    > + * this CPU, given that it was not in the rcu_cpu_online_map
    > + * when the grace-period scan started. This means that the
    > + * grace-period task might sleep. So make sure that if this
    > + * should happen, the first callback posted to this CPU will
    > + * wake up the grace-period task if need be.
    > + */
    > +
    > + local_irq_save(flags);
    > + rdp = RCU_DATA_ME();
    > + spin_lock(&rdp->lock);
    > + rdp->rcu_sched_sleeping = 1;
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > }
    >
    > #else /* #ifdef CONFIG_HOTPLUG_CPU */
    > @@ -993,26 +1042,214 @@ void call_rcu(struct rcu_head *head, voi
    > }
    > EXPORT_SYMBOL_GPL(call_rcu);
    >
    > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    > +{
    > + unsigned long flags;
    > + struct rcu_data *rdp;
    > + int wake_gp = 0;
    > +
    > + head->func = func;
    > + head->next = NULL;
    > + local_irq_save(flags);
    > + rdp = RCU_DATA_ME();
    > + spin_lock(&rdp->lock);
    > + *rdp->nextschedtail = head;
    > + rdp->nextschedtail = &head->next;
    > + if (rdp->rcu_sched_sleeping) {
    > +
    > + /* Grace-period processing might be sleeping... */
    > +
    > + rdp->rcu_sched_sleeping = 0;
    > + wake_gp = 1;
    > + }
    > + spin_unlock(&rdp->lock);
    > + local_irq_restore(flags);
    > + if (wake_gp) {
    > +
    > + /* Wake up grace-period processing, unless someone beat us. */
    > +
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    > + wake_gp = 0;
    > + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + if (wake_gp)
    > + wake_up(&rcu_ctrlblk.sched_wq);
    > + }
    > +}
    > +EXPORT_SYMBOL_GPL(call_rcu_sched);
    > +
    > /*
    > * Wait until all currently running preempt_disable() code segments
    > * (including hardware-irq-disable segments) complete. Note that
    > * in -rt this does -not- necessarily result in all currently executing
    > * interrupt -handlers- having completed.
    > */
    > -void __synchronize_sched(void)
    > +synchronize_rcu_xxx(__synchronize_sched, call_rcu_sched)
    > +EXPORT_SYMBOL_GPL(__synchronize_sched);
    > +
    > +/*
    > + * kthread function that manages call_rcu_sched grace periods.
    > + */
    > +static int
    > +rcu_sched_grace_period(void *arg)
    > {
    > - cpumask_t oldmask;
    > + int couldsleep; /* might sleep after current pass. */
    > + int couldsleepnext = 0; /* might sleep after next pass. */
    > int cpu;
    > + long err;
    > + unsigned long flags;
    > + int needsoftirq;
    > + struct rcu_data *rdp;
    >
    > - if (sched_getaffinity(0, &oldmask) < 0)
    > - oldmask = cpu_possible_map;
    > - for_each_online_cpu(cpu) {
    > - sched_setaffinity(0, cpumask_of_cpu(cpu));
    > - schedule();
    > - }
    > - sched_setaffinity(0, oldmask);
    > + /*
    > + * Each pass through the following loop handles one
    > + * rcu_sched grace period cycle.
    > + */
    > +
    > + do {
    > +
    > + /*
    > + * Sleep for about an RCU grace-period's worth to
    > + * allow better batching and to consume less CPU.
    > + */
    > +
    > + schedule_timeout_interruptible(HZ / 20);
    > +
    > + /*
    > + * If there was nothing to do last time, prepare to
    > + * sleep at the end of the current grace period cycle.
    > + */
    > +
    > + couldsleep = couldsleepnext;
    > + couldsleepnext = 1;
    > + if (couldsleep) {
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + }
    > +
    > + /*
    > + * Schedule on each CPU in turn, advancing callbacks
    > + * as we go. We will have visited each CPU between
    > + * the time we move a callback from the nextsched
    > + * list and the time we move that callback to the
    > + * done list. Now, a given CPU might come online
    > + * during that interval, but that means that it
    > + * was offline when we started, so we can safely
    > + * ignore it.
    > + */
    > +
    > + for_each_online_cpu(cpu) {
    > +
    > +retry:
    > +
    > + /* Initialize and schedule onto current CPU. */
    > +
    > + needsoftirq = 0;
    > + err = sched_setaffinity(0, cpumask_of_cpu(cpu));
    > + if (err < 0) {
    > + printk(KERN_WARNING "sched_setaffinity(%d) error: %ld, cpu_is_offline: %ld\n", cpu, err, cpu_is_offline(cpu));
    > + schedule_timeout_interruptible(HZ);
    > + continue;
    > + }
    > +
    > + /*
    > + * Get a reference to this CPU's rcu_data
    > + * structure, lock it, and verify that this
    > + * CPU is still online (skip it otherwise).
    > + */
    > +
    > + rdp = RCU_DATA_CPU(cpu);
    > + spin_lock_irqsave(&rdp->lock, flags);
    > + if (cpu_is_offline(cpu)) {
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > + continue;
    > + }
    > +
    > + /*
    > + * If we didn't end up on the CPU we expected
    > + * to, try again. This can happen if a CPU
    > + * goes offline before we attempt to schedule
    > + * on it, but comes back online before we get
    > + * to this check.
    > + */
    > +
    > + if (smp_processor_id() != cpu) {
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > + goto retry;
    > + }
    > +
    > + /*
    > + * We are running on the CPU irq-disabled, so it
    > + * cannot go offline until we re-enable irqs.
    > + *
    > + * Advance the callbacks! We share normal RCU's
    > + * donelist, since callbacks are invoked the
    > + * same way in either case.
    > + */
    > +
    > + if (rdp->waitschedlist != NULL) {
    > + *rdp->donetail = rdp->waitschedlist;
    > + rdp->donetail = rdp->waitschedtail;
    > + needsoftirq = 1;
    > + }
    > + if (rdp->nextschedlist != NULL) {
    > + rdp->waitschedlist = rdp->nextschedlist;
    > + rdp->waitschedtail = rdp->nextschedtail;
    > + couldsleep = 0;
    > + couldsleepnext = 0;
    > + } else {
    > + rdp->waitschedlist = NULL;
    > + rdp->waitschedtail = &rdp->waitschedlist;
    > + }
    > + rdp->nextschedlist = NULL;
    > + rdp->nextschedtail = &rdp->nextschedlist;
    > +
    > + /* Mark sleep intention. */
    > +
    > + rdp->rcu_sched_sleeping = couldsleep;
    > +
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > +
    > + /* If we added callbacks to donelist, process. */
    > +
    > + if (needsoftirq)
    > + raise_softirq(RCU_SOFTIRQ);
    > + }
    > +
    > + /* If we saw callbacks on the last scan, go deal with them. */
    > +
    > + if (!couldsleep)
    > + continue;
    > +
    > + /* Attempt to block... */
    > +
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    > +
    > + /*
    > + * Someone posted a callback after we scanned.
    > + * Go take care of it.
    > + */
    > +
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + couldsleepnext = 0;
    > + continue;
    > + }
    > +
    > + /* Block until the next person posts a callback. */
    > +
    > + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + __wait_event(rcu_ctrlblk.sched_wq,
    > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    > + couldsleepnext = 0;
    > +
    > + } while (!kthread_should_stop());
    > +
    > + return (0);
    > }
    > -EXPORT_SYMBOL_GPL(__synchronize_sched);
    >
    > /*
    > * Check to see if any future RCU-related work will need to be done
    > @@ -1107,6 +1344,11 @@ void __init __rcu_init(void)
    > rdp->donetail = &rdp->donelist;
    > rdp->rcu_flipctr[0] = 0;
    > rdp->rcu_flipctr[1] = 0;
    > + rdp->nextschedlist = NULL;
    > + rdp->nextschedtail = &rdp->nextschedlist;
    > + rdp->waitschedlist = NULL;
    > + rdp->waitschedtail = &rdp->waitschedlist;
    > + rdp->rcu_sched_sleeping = 0;
    > }
    > register_cpu_notifier(&rcu_nb);
    >
    > @@ -1129,6 +1371,18 @@ void __init __rcu_init(void)
    > }
    >
    > /*
    > + * Late-boot-time RCU initialization that must wait until after scheduler
    > + * has been initialized.
    > + */
    > +void __init rcu_init_sched(void)
    > +{
    > + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    > + NULL,
    > + "rcu_sched_grace_period");
    > + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    > +}
    > +
    > +/*
    > * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    > */
    > void synchronize_kernel(void)


    --
    Mathieu Desnoyers
    Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
    OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  4. Re: [PATCH,RFC] Add call_rcu_sched()

    On Mon, Mar 24, 2008 at 01:06:53AM -0400, Mathieu Desnoyers wrote:
    > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
    > > Hello!
    > >
    > > Second cut of patch to provide the call_rcu_sched() needed for Mathieu's
    > > markers implementation. This is again to synchronize_sched() as
    > > call_rcu() is to synchronize_rcu().
    > >
    > > Should be fine for experimental use, but not ready for inclusion.
    > >
    > > Passes short rcutorture sessions, but should be treated with some caution
    > > given that very little of it is more than 24 hours old. Fixes since the
    > > first version include a bug that could result in indefinite blocking
    > > (spotted by Gautham Shenoy), better resiliency against CPU-hotplug
    > > operations, and other minor fixes.
    > >
    > > Known/suspected shortcomings:
    > >
    > > o Only moderately tested -- only short rcutorture sessions.
    > >
    > > o Need to add call_rcu_sched() testing to rcutorture.
    > >
    > > o If I remember correctly, an rcu_barrier_sched() is required
    > > (Mathieu?).
    > >

    >
    > Hi Paul,
    >
    > Thanks for this work, I'll give it a try (I'm just back from a weekend
    > away from the city). Yes, my code needs a rcu_barrier_sched() so it can
    > wait for call_rcu_sched completion before it tries to re-use the data
    > structures at the next modification of the same marker.
    >
    > I think rcu_barrier_sched should be quite straightforward to implement
    > if we derive it from kernel/rcupdate.c:rcu_barrier. Actually, couldn't
    > we just rename rcu_barrier into something else made static (_rcu_barrier)
    > and call it with a different parameter telling which of call_rcu or
    > call_rcu_sched to use ?


    I was thinking in terms of a cpp macro sort of like synchronize_rcu_xxx(),
    but will look at this as well. In any case, I agree with sharing the
    mechanism between the two -- unless there is some screaming reason why
    we need to be able to do an rcu_barrier() and an rcu_sched_barrier()
    concurrently. ;-)

    > Something like this :
    >
    > Add rcu_barrier_sched
    >
    > Adds rcu_barrier_sched, which uses call_rcu_sched. It wait for each in flight
    > call_rcu_sched to be completed before it returns.
    >
    > Signed-off-by: Mathieu Desnoyers
    > ---
    > include/linux/rcupdate.h | 1 +
    > kernel/rcupdate.c | 40 +++++++++++++++++++++++++++++++++-------
    > 2 files changed, 34 insertions(+), 7 deletions(-)
    >
    > Index: linux-2.6-lttng/include/linux/rcupdate.h
    > ================================================== =================
    > --- linux-2.6-lttng.orig/include/linux/rcupdate.h 2008-03-24 00:13:26.000000000 -0400
    > +++ linux-2.6-lttng/include/linux/rcupdate.h 2008-03-24 00:13:36.000000000 -0400
    > @@ -260,6 +260,7 @@ extern void call_rcu_bh(struct rcu_head
    > /* Exported common interfaces */
    > extern void synchronize_rcu(void);
    > extern void rcu_barrier(void);
    > +extern void rcu_barrier_sched(void);
    > extern long rcu_batches_completed(void);
    > extern long rcu_batches_completed_bh(void);
    >
    > Index: linux-2.6-lttng/kernel/rcupdate.c
    > ================================================== =================
    > --- linux-2.6-lttng.orig/kernel/rcupdate.c 2008-03-24 00:07:15.000000000 -0400
    > +++ linux-2.6-lttng/kernel/rcupdate.c 2008-03-24 00:17:01.000000000 -0400
    > @@ -45,6 +45,11 @@
    > #include
    > #include
    >
    > +enum rcu_barrier {
    > + RCU_BARRIER_STD,
    > + RCU_BARRIER_SCHED,
    > +};
    > +
    > static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    > static atomic_t rcu_barrier_cpu_count;
    > static DEFINE_MUTEX(rcu_barrier_mutex);
    > @@ -83,19 +88,23 @@ static void rcu_barrier_callback(struct
    > /*
    > * Called with preemption disabled, and from cross-cpu IRQ context.
    > */
    > -static void rcu_barrier_func(void *notused)
    > +static void rcu_barrier_func(void *type)
    > {
    > int cpu = smp_processor_id();
    > struct rcu_head *head = &per_cpu(rcu_barrier_head, cpu);
    >
    > atomic_inc(&rcu_barrier_cpu_count);
    > - call_rcu(head, rcu_barrier_callback);
    > + switch((enum rcu_barrier)type) {
    > + case RCU_BARRIER_STD:
    > + call_rcu(head, rcu_barrier_callback);
    > + break;
    > + case RCU_BARRIER_SCHED:
    > + call_rcu_sched(head, rcu_barrier_callback);
    > + break;
    > + }
    > }
    >
    > -/**
    > - * rcu_barrier - Wait until all the in-flight RCUs are complete.
    > - */
    > -void rcu_barrier(void)
    > +static void _rcu_barrier(enum rcu_barrier type)
    > {
    > BUG_ON(in_interrupt());
    > /* Take cpucontrol mutex to protect against CPU hotplug */
    > @@ -111,13 +120,30 @@ void rcu_barrier(void)
    > * until all the callbacks are queued.
    > */
    > rcu_read_lock();
    > - on_each_cpu(rcu_barrier_func, NULL, 0, 1);
    > + on_each_cpu(rcu_barrier_func, (void *)type, 0, 1);
    > rcu_read_unlock();
    > wait_for_completion(&rcu_barrier_completion);
    > mutex_unlock(&rcu_barrier_mutex);
    > }
    > +
    > +/**
    > + * rcu_barrier - Wait until all the in-flight RCUs are complete.
    > + */
    > +void rcu_barrier(void)
    > +{
    > + _rcu_barrier(RCU_BARRIER_STD);
    > +}
    > EXPORT_SYMBOL_GPL(rcu_barrier);
    >
    > +/**
    > + * rcu_barrier_sched - Wait until all the in-flight call_rcu_sched are complete.
    > + */
    > +void rcu_barrier_sched(void)
    > +{
    > + _rcu_barrier(RCU_BARRIER_SCHED);
    > +}
    > +EXPORT_SYMBOL_GPL(rcu_barrier_sched);
    > +
    > void __init rcu_init(void)
    > {
    > __rcu_init();
    >
    >
    > > o Interaction of this patch with CPU hotplug should be viewed
    > > with great suspicion.

    >
    > Fix call_rcu_sched wait


    There are definitely some problems here... Though I am seeing them
    in the sched_setaffinity() call rather than in the wait processing.

    > > o If there are no synchronize_sched() calls for more than two
    > > minutes, one can see messages of the form "INFO: task
    > > rcu_sched_grace:924 blocked for more than 120 seconds."
    > > Any thoughts on how to avoid this message? Should I be using
    > > something other than __wait_event() and wake_up(), which sleep
    > > uninterruptibly, thus triggering this message?
    > >

    >
    > Could you use __wait_event_interruptible and wake_up_interruptible
    > instead ? softlockup.c only seems to complain when uninterruptible tasks
    > are not scheduled for 2 minutes. I guess that when we receive a signal
    > we could simply go through another loop.


    I will give these a try.

    > Signed-off-by: Mathieu Desnoyers
    > ---
    > kernel/rcupreempt.c | 9 ++++++---
    > 1 file changed, 6 insertions(+), 3 deletions(-)
    >
    > Index: linux-2.6-lttng/kernel/rcupreempt.c
    > ================================================== =================
    > --- linux-2.6-lttng.orig/kernel/rcupreempt.c 2008-03-24 00:26:27.000000000 -0400
    > +++ linux-2.6-lttng/kernel/rcupreempt.c 2008-03-24 00:33:47.000000000 -0400
    > @@ -1074,7 +1074,7 @@ void call_rcu_sched(struct rcu_head *hea
    > rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > if (wake_gp)
    > - wake_up(&rcu_ctrlblk.sched_wq);
    > + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    > }
    > }
    > EXPORT_SYMBOL_GPL(call_rcu_sched);
    > @@ -1097,6 +1097,7 @@ rcu_sched_grace_period(void *arg)
    > int couldsleep; /* might sleep after current pass. */
    > int couldsleepnext = 0; /* might sleep after next pass. */
    > int cpu;
    > + int ret;
    > long err;
    > unsigned long flags;
    > int needsoftirq;
    > @@ -1242,8 +1243,10 @@ retry:
    >
    > rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > - __wait_event(rcu_ctrlblk.sched_wq,
    > - rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    > + ret = 0;
    > + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    > + ret);


    Don't we have to do something here to clear signal state if we are
    ever to block again? Maybe something like the following?

    flush_signals(current):

    Or am I missing something?

    > couldsleepnext = 0;
    >
    > } while (!kthread_should_stop());
    >
    > > One other thing -- this patch also fixes a long-standing bug in the
    > > earlier preemptable-RCU implementation of synchronize_rcu() that could
    > > result in loss of concurrent external changes to a task's CPU affinity
    > > mask. I have lost track of who reported this...

    >
    > That's always good


    Fixing the bug or losing track? ;-)

    Thanx, Paul

    > Mathieu
    >
    > > Signed-off-by: Paul E. McKenney
    > > ---
    > >
    > > include/linux/rcuclassic.h | 3
    > > include/linux/rcupdate.h | 22 +++
    > > include/linux/rcupreempt.h | 15 ++
    > > init/main.c | 1
    > > kernel/rcupdate.c | 20 ---
    > > kernel/rcupreempt.c | 276 +++++++++++++++++++++++++++++++++++++++++++--
    > > 6 files changed, 308 insertions(+), 29 deletions(-)
    > >
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcuclassic.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h
    > > --- linux-2.6.25-rc6/include/linux/rcuclassic.h 2008-03-16 17:45:16.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h 2008-03-21 04:27:31.000000000 -0700
    > > @@ -153,7 +153,10 @@ extern struct lockdep_map rcu_lock_map;
    > >
    > > #define __synchronize_sched() synchronize_rcu()
    > >
    > > +#define call_rcu_sched(head, func) call_rcu(head, func)
    > > +
    > > extern void __rcu_init(void);
    > > +#define rcu_init_sched() do { } while (0)
    > > extern void rcu_check_callbacks(int cpu, int user);
    > > extern void rcu_restart_cpu(int cpu);
    > >
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupdate.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h
    > > --- linux-2.6.25-rc6/include/linux/rcupdate.h 2008-03-16 17:45:16.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h 2008-03-20 21:10:42.000000000 -0700
    > > @@ -42,6 +42,7 @@
    > > #include
    > > #include
    > > #include
    > > +#include
    > >
    > > /**
    > > * struct rcu_head - callback structure for use with RCU
    > > @@ -182,6 +183,27 @@ struct rcu_head {
    > > (p) = (v); \
    > > })
    > >
    > > +/* Infrastructure to implement the synchronize_() primitives. */
    > > +
    > > +struct rcu_synchronize {
    > > + struct rcu_head head;
    > > + struct completion completion;
    > > +};
    > > +
    > > +extern void wakeme_after_rcu(struct rcu_head *head);
    > > +
    > > +#define synchronize_rcu_xxx(name, func) \
    > > +void name(void) \
    > > +{ \
    > > + struct rcu_synchronize rcu; \
    > > + \
    > > + init_completion(&rcu.completion); \
    > > + /* Will wake me after RCU finished. */ \
    > > + func(&rcu.head, wakeme_after_rcu); \
    > > + /* Wait for it. */ \
    > > + wait_for_completion(&rcu.completion); \
    > > +}
    > > +
    > > /**
    > > * synchronize_sched - block until all CPUs have exited any non-preemptive
    > > * kernel code sequences.
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupreempt.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h
    > > --- linux-2.6.25-rc6/include/linux/rcupreempt.h 2008-03-16 17:45:16.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h 2008-03-21 04:31:29.000000000 -0700
    > > @@ -46,6 +46,20 @@
    > > #define rcu_bh_qsctr_inc(cpu)
    > > #define call_rcu_bh(head, rcu) call_rcu(head, rcu)
    > >
    > > +/**
    > > + * call_rcu_sched - Queue RCU callback for invocation after sched grace period.
    > > + * @head: structure to be used for queueing the RCU updates.
    > > + * @func: actual update function to be invoked after the grace period
    > > + *
    > > + * The update function will be invoked some time after a full
    > > + * synchronize_sched()-style grace period elapses, in other words after
    > > + * all currently executing preempt-disabled sections of code (including
    > > + * hardirq handlers, NMI handlers, and local_irq_save() blocks) have
    > > + * completed.
    > > + */
    > > +extern void call_rcu_sched(struct rcu_head *head,
    > > + void (*func)(struct rcu_head *head));
    > > +
    > > extern void __rcu_read_lock(void) __acquires(RCU);
    > > extern void __rcu_read_unlock(void) __releases(RCU);
    > > extern int rcu_pending(int cpu);
    > > @@ -57,6 +71,7 @@ extern int rcu_needs_cpu(int cpu);
    > > extern void __synchronize_sched(void);
    > >
    > > extern void __rcu_init(void);
    > > +extern void rcu_init_sched(void);
    > > extern void rcu_check_callbacks(int cpu, int user);
    > > extern void rcu_restart_cpu(int cpu);
    > > extern long rcu_batches_completed(void);
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/init/main.c linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c
    > > --- linux-2.6.25-rc6/init/main.c 2008-03-16 17:45:17.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c 2008-03-21 04:31:31.000000000 -0700
    > > @@ -736,6 +736,7 @@ static void __init do_basic_setup(void)
    > > driver_init();
    > > init_irq_proc();
    > > do_initcalls();
    > > + rcu_init_sched();
    > > }
    > >
    > > static int __initdata nosoftlockup;
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupdate.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c
    > > --- linux-2.6.25-rc6/kernel/rcupdate.c 2008-03-16 17:45:17.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c 2008-03-20 21:10:39.000000000 -0700
    > > @@ -39,18 +39,12 @@
    > > #include
    > > #include
    > > #include
    > > -#include
    > > #include
    > > #include
    > > #include
    > > #include
    > > #include
    > >
    > > -struct rcu_synchronize {
    > > - struct rcu_head head;
    > > - struct completion completion;
    > > -};
    > > -
    > > static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    > > static atomic_t rcu_barrier_cpu_count;
    > > static DEFINE_MUTEX(rcu_barrier_mutex);
    > > @@ -60,7 +54,7 @@ static struct completion rcu_barrier_com
    > > * Awaken the corresponding synchronize_rcu() instance now that a
    > > * grace period has elapsed.
    > > */
    > > -static void wakeme_after_rcu(struct rcu_head *head)
    > > +void wakeme_after_rcu(struct rcu_head *head)
    > > {
    > > struct rcu_synchronize *rcu;
    > >
    > > @@ -77,17 +71,7 @@ static void wakeme_after_rcu(struct rcu_
    > > * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
    > > * and may be nested.
    > > */
    > > -void synchronize_rcu(void)
    > > -{
    > > - struct rcu_synchronize rcu;
    > > -
    > > - init_completion(&rcu.completion);
    > > - /* Will wake me after RCU finished */
    > > - call_rcu(&rcu.head, wakeme_after_rcu);
    > > -
    > > - /* Wait for it */
    > > - wait_for_completion(&rcu.completion);
    > > -}
    > > +synchronize_rcu_xxx(synchronize_rcu, call_rcu)
    > > EXPORT_SYMBOL_GPL(synchronize_rcu);
    > >
    > > static void rcu_barrier_callback(struct rcu_head *notused)
    > > diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupreempt.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c
    > > --- linux-2.6.25-rc6/kernel/rcupreempt.c 2008-03-16 17:45:17.000000000 -0700
    > > +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c 2008-03-21 12:44:24.000000000 -0700
    > > @@ -46,6 +46,7 @@
    > > #include
    > > #include
    > > #include
    > > +#include
    > > #include
    > > #include
    > > #include
    > > @@ -87,9 +88,14 @@ struct rcu_data {
    > > struct rcu_head **nexttail;
    > > struct rcu_head *waitlist[GP_STAGES];
    > > struct rcu_head **waittail[GP_STAGES];
    > > - struct rcu_head *donelist;
    > > + struct rcu_head *donelist; /* from waitlist & waitschedlist */
    > > struct rcu_head **donetail;
    > > long rcu_flipctr[2];
    > > + struct rcu_head *nextschedlist;
    > > + struct rcu_head **nextschedtail;
    > > + struct rcu_head *waitschedlist;
    > > + struct rcu_head **waitschedtail;
    > > + int rcu_sched_sleeping;
    > > #ifdef CONFIG_RCU_TRACE
    > > struct rcupreempt_trace trace;
    > > #endif /* #ifdef CONFIG_RCU_TRACE */
    > > @@ -131,11 +137,24 @@ enum rcu_try_flip_states {
    > > rcu_try_flip_waitmb_state,
    > > };
    > >
    > > +/*
    > > + * States for rcu_ctrlblk.rcu_sched_sleep.
    > > + */
    > > +
    > > +enum rcu_sched_sleep_states {
    > > + rcu_sched_not_sleeping, /* Not sleeping, callbacks need GP. */
    > > + rcu_sched_sleep_prep, /* Thinking of sleeping, rechecking. */
    > > + rcu_sched_sleeping, /* Sleeping, awaken if GP needed. */
    > > +};
    > > +
    > > struct rcu_ctrlblk {
    > > spinlock_t fliplock; /* Protect state-machine transitions. */
    > > long completed; /* Number of last completed batch. */
    > > enum rcu_try_flip_states rcu_try_flip_state; /* The current state of
    > > the rcu state machine */
    > > + spinlock_t schedlock; /* Protect rcu_sched sleep state. */
    > > + enum rcu_sched_sleep_states sched_sleep; /* rcu_sched state. */
    > > + wait_queue_head_t sched_wq; /* Place for rcu_sched to sleep. */
    > > };
    > >
    > > static DEFINE_PER_CPU(struct rcu_data, rcu_data);
    > > @@ -143,8 +162,12 @@ static struct rcu_ctrlblk rcu_ctrlblk =
    > > .fliplock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.fliplock),
    > > .completed = 0,
    > > .rcu_try_flip_state = rcu_try_flip_idle_state,
    > > + .schedlock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.schedlock),
    > > + .sched_sleep = rcu_sched_not_sleeping,
    > > + .sched_wq = __WAIT_QUEUE_HEAD_INITIALIZER(rcu_ctrlblk.sched_wq ),
    > > };
    > >
    > > +static struct task_struct *rcu_sched_grace_period_task;
    > >
    > > #ifdef CONFIG_RCU_TRACE
    > > static char *rcu_try_flip_state_names[] =
    > > @@ -871,6 +894,8 @@ void rcu_offline_cpu(int cpu)
    > > struct rcu_head *list = NULL;
    > > unsigned long flags;
    > > struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    > > + struct rcu_head *schedlist = NULL;
    > > + struct rcu_head **schedtail = &schedlist;
    > > struct rcu_head **tail = &list;
    > >
    > > /*
    > > @@ -884,6 +909,11 @@ void rcu_offline_cpu(int cpu)
    > > rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
    > > list, tail);
    > > rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
    > > + rcu_offline_cpu_enqueue(rdp->waitschedlist, rdp->waitschedtail,
    > > + schedlist, schedtail);
    > > + rcu_offline_cpu_enqueue(rdp->nextschedlist, rdp->nextschedtail,
    > > + schedlist, schedtail);
    > > + rdp->rcu_sched_sleeping = 0;
    > > spin_unlock_irqrestore(&rdp->lock, flags);
    > > rdp->waitlistcount = 0;
    > >
    > > @@ -924,16 +954,35 @@ void rcu_offline_cpu(int cpu)
    > > *rdp->nexttail = list;
    > > if (list)
    > > rdp->nexttail = tail;
    > > + *rdp->nextschedtail = schedlist;
    > > + if (schedlist)
    > > + rdp->nextschedtail = schedtail;
    > > spin_unlock_irqrestore(&rdp->lock, flags);
    > > }
    > >
    > > void __devinit rcu_online_cpu(int cpu)
    > > {
    > > unsigned long flags;
    > > + struct rcu_data *rdp;
    > >
    > > spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
    > > cpu_set(cpu, rcu_cpu_online_map);
    > > spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
    > > +
    > > + /*
    > > + * The rcu_sched grace-period processing might have bypassed
    > > + * this CPU, given that it was not in the rcu_cpu_online_map
    > > + * when the grace-period scan started. This means that the
    > > + * grace-period task might sleep. So make sure that if this
    > > + * should happen, the first callback posted to this CPU will
    > > + * wake up the grace-period task if need be.
    > > + */
    > > +
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);
    > > + rdp->rcu_sched_sleeping = 1;
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > }
    > >
    > > #else /* #ifdef CONFIG_HOTPLUG_CPU */
    > > @@ -993,26 +1042,214 @@ void call_rcu(struct rcu_head *head, voi
    > > }
    > > EXPORT_SYMBOL_GPL(call_rcu);
    > >
    > > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    > > +{
    > > + unsigned long flags;
    > > + struct rcu_data *rdp;
    > > + int wake_gp = 0;
    > > +
    > > + head->func = func;
    > > + head->next = NULL;
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);
    > > + *rdp->nextschedtail = head;
    > > + rdp->nextschedtail = &head->next;
    > > + if (rdp->rcu_sched_sleeping) {
    > > +
    > > + /* Grace-period processing might be sleeping... */
    > > +
    > > + rdp->rcu_sched_sleeping = 0;
    > > + wake_gp = 1;
    > > + }
    > > + spin_unlock(&rdp->lock);
    > > + local_irq_restore(flags);
    > > + if (wake_gp) {
    > > +
    > > + /* Wake up grace-period processing, unless someone beat us. */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    > > + wake_gp = 0;
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + if (wake_gp)
    > > + wake_up(&rcu_ctrlblk.sched_wq);
    > > + }
    > > +}
    > > +EXPORT_SYMBOL_GPL(call_rcu_sched);
    > > +
    > > /*
    > > * Wait until all currently running preempt_disable() code segments
    > > * (including hardware-irq-disable segments) complete. Note that
    > > * in -rt this does -not- necessarily result in all currently executing
    > > * interrupt -handlers- having completed.
    > > */
    > > -void __synchronize_sched(void)
    > > +synchronize_rcu_xxx(__synchronize_sched, call_rcu_sched)
    > > +EXPORT_SYMBOL_GPL(__synchronize_sched);
    > > +
    > > +/*
    > > + * kthread function that manages call_rcu_sched grace periods.
    > > + */
    > > +static int
    > > +rcu_sched_grace_period(void *arg)
    > > {
    > > - cpumask_t oldmask;
    > > + int couldsleep; /* might sleep after current pass. */
    > > + int couldsleepnext = 0; /* might sleep after next pass. */
    > > int cpu;
    > > + long err;
    > > + unsigned long flags;
    > > + int needsoftirq;
    > > + struct rcu_data *rdp;
    > >
    > > - if (sched_getaffinity(0, &oldmask) < 0)
    > > - oldmask = cpu_possible_map;
    > > - for_each_online_cpu(cpu) {
    > > - sched_setaffinity(0, cpumask_of_cpu(cpu));
    > > - schedule();
    > > - }
    > > - sched_setaffinity(0, oldmask);
    > > + /*
    > > + * Each pass through the following loop handles one
    > > + * rcu_sched grace period cycle.
    > > + */
    > > +
    > > + do {
    > > +
    > > + /*
    > > + * Sleep for about an RCU grace-period's worth to
    > > + * allow better batching and to consume less CPU.
    > > + */
    > > +
    > > + schedule_timeout_interruptible(HZ / 20);
    > > +
    > > + /*
    > > + * If there was nothing to do last time, prepare to
    > > + * sleep at the end of the current grace period cycle.
    > > + */
    > > +
    > > + couldsleep = couldsleepnext;
    > > + couldsleepnext = 1;
    > > + if (couldsleep) {
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + }
    > > +
    > > + /*
    > > + * Schedule on each CPU in turn, advancing callbacks
    > > + * as we go. We will have visited each CPU between
    > > + * the time we move a callback from the nextsched
    > > + * list and the time we move that callback to the
    > > + * done list. Now, a given CPU might come online
    > > + * during that interval, but that means that it
    > > + * was offline when we started, so we can safely
    > > + * ignore it.
    > > + */
    > > +
    > > + for_each_online_cpu(cpu) {
    > > +
    > > +retry:
    > > +
    > > + /* Initialize and schedule onto current CPU. */
    > > +
    > > + needsoftirq = 0;
    > > + err = sched_setaffinity(0, cpumask_of_cpu(cpu));
    > > + if (err < 0) {
    > > + printk(KERN_WARNING "sched_setaffinity(%d) error: %ld, cpu_is_offline: %ld\n", cpu, err, cpu_is_offline(cpu));
    > > + schedule_timeout_interruptible(HZ);
    > > + continue;
    > > + }
    > > +
    > > + /*
    > > + * Get a reference to this CPU's rcu_data
    > > + * structure, lock it, and verify that this
    > > + * CPU is still online (skip it otherwise).
    > > + */
    > > +
    > > + rdp = RCU_DATA_CPU(cpu);
    > > + spin_lock_irqsave(&rdp->lock, flags);
    > > + if (cpu_is_offline(cpu)) {
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > + continue;
    > > + }
    > > +
    > > + /*
    > > + * If we didn't end up on the CPU we expected
    > > + * to, try again. This can happen if a CPU
    > > + * goes offline before we attempt to schedule
    > > + * on it, but comes back online before we get
    > > + * to this check.
    > > + */
    > > +
    > > + if (smp_processor_id() != cpu) {
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > + goto retry;
    > > + }
    > > +
    > > + /*
    > > + * We are running on the CPU irq-disabled, so it
    > > + * cannot go offline until we re-enable irqs.
    > > + *
    > > + * Advance the callbacks! We share normal RCU's
    > > + * donelist, since callbacks are invoked the
    > > + * same way in either case.
    > > + */
    > > +
    > > + if (rdp->waitschedlist != NULL) {
    > > + *rdp->donetail = rdp->waitschedlist;
    > > + rdp->donetail = rdp->waitschedtail;
    > > + needsoftirq = 1;
    > > + }
    > > + if (rdp->nextschedlist != NULL) {
    > > + rdp->waitschedlist = rdp->nextschedlist;
    > > + rdp->waitschedtail = rdp->nextschedtail;
    > > + couldsleep = 0;
    > > + couldsleepnext = 0;
    > > + } else {
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + }
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > +
    > > + /* Mark sleep intention. */
    > > +
    > > + rdp->rcu_sched_sleeping = couldsleep;
    > > +
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > +
    > > + /* If we added callbacks to donelist, process. */
    > > +
    > > + if (needsoftirq)
    > > + raise_softirq(RCU_SOFTIRQ);
    > > + }
    > > +
    > > + /* If we saw callbacks on the last scan, go deal with them. */
    > > +
    > > + if (!couldsleep)
    > > + continue;
    > > +
    > > + /* Attempt to block... */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    > > +
    > > + /*
    > > + * Someone posted a callback after we scanned.
    > > + * Go take care of it.
    > > + */
    > > +
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + couldsleepnext = 0;
    > > + continue;
    > > + }
    > > +
    > > + /* Block until the next person posts a callback. */
    > > +
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + __wait_event(rcu_ctrlblk.sched_wq,
    > > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    > > + couldsleepnext = 0;
    > > +
    > > + } while (!kthread_should_stop());
    > > +
    > > + return (0);
    > > }
    > > -EXPORT_SYMBOL_GPL(__synchronize_sched);
    > >
    > > /*
    > > * Check to see if any future RCU-related work will need to be done
    > > @@ -1107,6 +1344,11 @@ void __init __rcu_init(void)
    > > rdp->donetail = &rdp->donelist;
    > > rdp->rcu_flipctr[0] = 0;
    > > rdp->rcu_flipctr[1] = 0;
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + rdp->rcu_sched_sleeping = 0;
    > > }
    > > register_cpu_notifier(&rcu_nb);
    > >
    > > @@ -1129,6 +1371,18 @@ void __init __rcu_init(void)
    > > }
    > >
    > > /*
    > > + * Late-boot-time RCU initialization that must wait until after scheduler
    > > + * has been initialized.
    > > + */
    > > +void __init rcu_init_sched(void)
    > > +{
    > > + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    > > + NULL,
    > > + "rcu_sched_grace_period");
    > > + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    > > +}
    > > +
    > > +/*
    > > * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    > > */
    > > void synchronize_kernel(void)

    >
    > --
    > Mathieu Desnoyers
    > Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
    > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68

    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  5. Re: [PATCH,RFC] Add call_rcu_sched()

    * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
    > On Mon, Mar 24, 2008 at 01:06:53AM -0400, Mathieu Desnoyers wrote:
    > > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:

    [...]
    > > > o Interaction of this patch with CPU hotplug should be viewed
    > > > with great suspicion.

    > >
    > > Fix call_rcu_sched wait

    >
    > There are definitely some problems here... Though I am seeing them
    > in the sched_setaffinity() call rather than in the wait processing.
    >


    Sorry for the misleading line : "Fix call_rcu_sched wait" was the title
    of the patch addressing the rcu_sched_grace:924 blocked ... problem below.

    > > > o If there are no synchronize_sched() calls for more than two
    > > > minutes, one can see messages of the form "INFO: task
    > > > rcu_sched_grace:924 blocked for more than 120 seconds."
    > > > Any thoughts on how to avoid this message? Should I be using
    > > > something other than __wait_event() and wake_up(), which sleep
    > > > uninterruptibly, thus triggering this message?
    > > >

    > >

    [...]
    > > Could you use __wait_event_interruptible and wake_up_interruptible
    > > instead ? softlockup.c only seems to complain when uninterruptible tasks
    > > are not scheduled for 2 minutes. I guess that when we receive a signal
    > > we could simply go through another loop.

    >
    > I will give these a try.
    >
    > > + ret = 0;
    > > + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    > > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    > > + ret);

    >
    > Don't we have to do something here to clear signal state if we are
    > ever to block again? Maybe something like the following?
    >
    > flush_signals(current):
    >
    > Or am I missing something?
    >


    Good point, I would add
    if (ret < 0)
    flush_signals(current);

    [...]
    > >
    > > That's always good

    >
    > Fixing the bug or losing track? ;-)
    >


    Fixing it of course

    New version of the fix-call-rcu-sched-wait.patch file below.

    Mathieu


    Fix call_rcu_sched wait

    > o If there are no synchronize_sched() calls for more than two
    > minutes, one can see messages of the form "INFO: task
    > rcu_sched_grace:924 blocked for more than 120 seconds."
    > Any thoughts on how to avoid this message? Should I be using
    > something other than __wait_event() and wake_up(), which sleep
    > uninterruptibly, thus triggering this message?
    >


    Could you use __wait_event_interruptible and wake_up_interruptible
    instead ? softlockup.c only seems to complain when uninterruptible tasks
    are not scheduled for 2 minutes. I guess that when we receive a signal
    we could simply go through another loop.

    - Changelog
    Reset signal state upon wakeup.

    Signed-off-by: Mathieu Desnoyers
    ---
    kernel/rcupreempt.c | 11 ++++++++---
    1 file changed, 8 insertions(+), 3 deletions(-)

    Index: linux-2.6-lttng/kernel/rcupreempt.c
    ================================================== =================
    --- linux-2.6-lttng.orig/kernel/rcupreempt.c 2008-03-24 00:26:27.000000000 -0400
    +++ linux-2.6-lttng/kernel/rcupreempt.c 2008-03-24 09:57:28.000000000 -0400
    @@ -1074,7 +1074,7 @@ void call_rcu_sched(struct rcu_head *hea
    rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    if (wake_gp)
    - wake_up(&rcu_ctrlblk.sched_wq);
    + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    }
    }
    EXPORT_SYMBOL_GPL(call_rcu_sched);
    @@ -1097,6 +1097,7 @@ rcu_sched_grace_period(void *arg)
    int couldsleep; /* might sleep after current pass. */
    int couldsleepnext = 0; /* might sleep after next pass. */
    int cpu;
    + int ret;
    long err;
    unsigned long flags;
    int needsoftirq;
    @@ -1242,8 +1243,12 @@ retry:

    rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    - __wait_event(rcu_ctrlblk.sched_wq,
    - rcu_ctrlblk.sched_sleep != rcu_sched_sleeping);
    + ret = 0;
    + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    + ret);
    + if (ret < 0)
    + flush_signals(current);
    couldsleepnext = 0;

    } while (!kthread_should_stop());

    --
    Mathieu Desnoyers
    Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
    OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  6. Re: [PATCH,RFC] Initialize call_rcu_sched sooner

    Initialize call_rcu_sched sooner so it can be used in module_init stage.

    (needed for LTTng)

    Signed-off-by: Mathieu Desnoyers
    ---
    init/main.c | 2 +-
    1 file changed, 1 insertion(+), 1 deletion(-)

    Index: linux-2.6-lttng/init/main.c
    ================================================== =================
    --- linux-2.6-lttng.orig/init/main.c 2008-03-25 08:49:02.000000000 -0400
    +++ linux-2.6-lttng/init/main.c 2008-03-25 08:49:20.000000000 -0400
    @@ -738,13 +738,13 @@ static void __init do_initcalls(void)
    */
    static void __init do_basic_setup(void)
    {
    + rcu_init_sched();
    /* drivers will send hotplug events */
    init_workqueues();
    usermodehelper_init();
    driver_init();
    init_irq_proc();
    do_initcalls();
    - rcu_init_sched();
    }

    static int __initdata nosoftlockup;
    --
    Mathieu Desnoyers
    Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
    OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  7. Re: [PATCH,RFC] Initialize call_rcu_sched sooner

    On Tue, Mar 25, 2008 at 08:53:08AM -0400, Mathieu Desnoyers wrote:
    > Initialize call_rcu_sched sooner so it can be used in module_init stage.
    >
    > (needed for LTTng)


    This passes smoke tests, so have incorporated it. Thank you, Mathieu!!!

    Thanx, Paul

    > Signed-off-by: Mathieu Desnoyers
    > ---
    > init/main.c | 2 +-
    > 1 file changed, 1 insertion(+), 1 deletion(-)
    >
    > Index: linux-2.6-lttng/init/main.c
    > ================================================== =================
    > --- linux-2.6-lttng.orig/init/main.c 2008-03-25 08:49:02.000000000 -0400
    > +++ linux-2.6-lttng/init/main.c 2008-03-25 08:49:20.000000000 -0400
    > @@ -738,13 +738,13 @@ static void __init do_initcalls(void)
    > */
    > static void __init do_basic_setup(void)
    > {
    > + rcu_init_sched();
    > /* drivers will send hotplug events */
    > init_workqueues();
    > usermodehelper_init();
    > driver_init();
    > init_irq_proc();
    > do_initcalls();
    > - rcu_init_sched();
    > }
    >
    > static int __initdata nosoftlockup;
    > --
    > Mathieu Desnoyers
    > Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
    > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68

    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  8. Re: [PATCH,RFC] Add call_rcu_sched()

    Hello!

    Third cut of patch to provide the call_rcu_sched(). This is again to
    synchronize_sched() as call_rcu() is to synchronize_rcu().

    Should be fine for experimental use, but not ready for inclusion.

    Passes multi-hour rcutorture sessions with concurrent CPU hotplugging.

    Fixes since the first version include a bug that could result in
    indefinite blocking (spotted by Gautham Shenoy), better resiliency
    against CPU-hotplug operations, and other minor fixes.

    Fixes since the second version include reworking grace-period detection
    to avoid deadlocks that could happen when running concurrently with
    CPU hotplug, adding Mathieu's fix to avoid the softlockup messages,
    as well as Mathieu's fix to allow use earlier in boot.

    Known/suspected shortcomings:

    o Only moderately tested on x86-64 and POWER -- a few hours of
    rcutorture with concurrent CPU hotplugging. In particular, I
    still do not trust the sleep/wakeup logic between call_rcu_sched()
    and rcu_sched_grace_period().

    o Need to add call_rcu_sched() testing to rcutorture.

    o Still needs rcu_barrier_sched() -- intending to incorporate
    the version Mathieu provided.

    This patch also fixes a long-standing bug in the earlier preemptable-RCU
    implementation of synchronize_rcu() that could result in loss of
    concurrent external changes to a task's CPU affinity mask. I still cannot
    remember who reported this...

    Signed-off-by: Paul E. McKenney
    Signed-off-by: Mathieu Desnoyers
    ---

    include/linux/rcuclassic.h | 3
    include/linux/rcupdate.h | 22 ++
    include/linux/rcupreempt.h | 42 ++++
    init/main.c | 1
    kernel/rcupdate.c | 20 --
    kernel/rcupreempt.c | 401 ++++++++++++++++++++++++++++++++++++++++-----
    6 files changed, 427 insertions(+), 62 deletions(-)

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcuclassic.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h
    --- linux-2.6.25-rc6/include/linux/rcuclassic.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcuclassic.h 2008-03-21 04:27:31.000000000 -0700
    @@ -153,7 +153,10 @@ extern struct lockdep_map rcu_lock_map;

    #define __synchronize_sched() synchronize_rcu()

    +#define call_rcu_sched(head, func) call_rcu(head, func)
    +
    extern void __rcu_init(void);
    +#define rcu_init_sched() do { } while (0)
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);

    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupdate.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h
    --- linux-2.6.25-rc6/include/linux/rcupdate.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupdate.h 2008-03-20 21:10:42.000000000 -0700
    @@ -42,6 +42,7 @@
    #include
    #include
    #include
    +#include

    /**
    * struct rcu_head - callback structure for use with RCU
    @@ -182,6 +183,27 @@ struct rcu_head {
    (p) = (v); \
    })

    +/* Infrastructure to implement the synchronize_() primitives. */
    +
    +struct rcu_synchronize {
    + struct rcu_head head;
    + struct completion completion;
    +};
    +
    +extern void wakeme_after_rcu(struct rcu_head *head);
    +
    +#define synchronize_rcu_xxx(name, func) \
    +void name(void) \
    +{ \
    + struct rcu_synchronize rcu; \
    + \
    + init_completion(&rcu.completion); \
    + /* Will wake me after RCU finished. */ \
    + func(&rcu.head, wakeme_after_rcu); \
    + /* Wait for it. */ \
    + wait_for_completion(&rcu.completion); \
    +}
    +
    /**
    * synchronize_sched - block until all CPUs have exited any non-preemptive
    * kernel code sequences.
    diff -urpNa -X dontdiff linux-2.6.25-rc6/include/linux/rcupreempt.h linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h
    --- linux-2.6.25-rc6/include/linux/rcupreempt.h 2008-03-16 17:45:16.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/include/linux/rcupreempt.h 2008-04-04 18:17:54.000000000 -0700
    @@ -42,10 +42,39 @@
    #include
    #include

    -#define rcu_qsctr_inc(cpu)
    +struct rcu_dyntick_sched {
    + int dynticks;
    + int dynticks_snap;
    + int sched_qs;
    + int sched_qs_snap;
    + int sched_dynticks_snap;
    +};
    +
    +DECLARE_PER_CPU(struct rcu_dyntick_sched, rcu_dyntick_sched);
    +
    +static inline void rcu_qsctr_inc(int cpu)
    +{
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + rdssp->sched_qs++;
    +}
    #define rcu_bh_qsctr_inc(cpu)
    #define call_rcu_bh(head, rcu) call_rcu(head, rcu)

    +/**
    + * call_rcu_sched - Queue RCU callback for invocation after sched grace period.
    + * @head: structure to be used for queueing the RCU updates.
    + * @func: actual update function to be invoked after the grace period
    + *
    + * The update function will be invoked some time after a full
    + * synchronize_sched()-style grace period elapses, in other words after
    + * all currently executing preempt-disabled sections of code (including
    + * hardirq handlers, NMI handlers, and local_irq_save() blocks) have
    + * completed.
    + */
    +extern void call_rcu_sched(struct rcu_head *head,
    + void (*func)(struct rcu_head *head));
    +
    extern void __rcu_read_lock(void) __acquires(RCU);
    extern void __rcu_read_unlock(void) __releases(RCU);
    extern int rcu_pending(int cpu);
    @@ -57,6 +86,7 @@ extern int rcu_needs_cpu(int cpu);
    extern void __synchronize_sched(void);

    extern void __rcu_init(void);
    +extern void rcu_init_sched(void);
    extern void rcu_check_callbacks(int cpu, int user);
    extern void rcu_restart_cpu(int cpu);
    extern long rcu_batches_completed(void);
    @@ -83,20 +113,20 @@ extern struct rcupreempt_trace *rcupreem
    struct softirq_action;

    #ifdef CONFIG_NO_HZ
    -DECLARE_PER_CPU(long, dynticks_progress_counter);
    +DECLARE_PER_CPU(struct rcu_dyntick_sched, rcu_dyntick_sched);

    static inline void rcu_enter_nohz(void)
    {
    - __get_cpu_var(dynticks_progress_counter)++;
    - WARN_ON(__get_cpu_var(dynticks_progress_counter) & 0x1);
    + __get_cpu_var(rcu_dyntick_sched).dynticks++;
    + WARN_ON(__get_cpu_var(rcu_dyntick_sched).dynticks & 0x1);
    mb();
    }

    static inline void rcu_exit_nohz(void)
    {
    mb();
    - __get_cpu_var(dynticks_progress_counter)++;
    - WARN_ON(!(__get_cpu_var(dynticks_progress_counter) & 0x1));
    + __get_cpu_var(rcu_dyntick_sched).dynticks++;
    + WARN_ON(!(__get_cpu_var(rcu_dyntick_sched).dyntick s & 0x1));
    }

    #else /* CONFIG_NO_HZ */
    diff -urpNa -X dontdiff linux-2.6.25-rc6/init/main.c linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c
    --- linux-2.6.25-rc6/init/main.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/init/main.c 2008-03-25 08:31:17.000000000 -0700
    @@ -730,6 +730,7 @@ static void __init do_initcalls(void)
    */
    static void __init do_basic_setup(void)
    {
    + rcu_init_sched(); /* needed by module_init stage. */
    /* drivers will send hotplug events */
    init_workqueues();
    usermodehelper_init();
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupdate.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c
    --- linux-2.6.25-rc6/kernel/rcupdate.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupdate.c 2008-03-20 21:10:39.000000000 -0700
    @@ -39,18 +39,12 @@
    #include
    #include
    #include
    -#include
    #include
    #include
    #include
    #include
    #include

    -struct rcu_synchronize {
    - struct rcu_head head;
    - struct completion completion;
    -};
    -
    static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
    static atomic_t rcu_barrier_cpu_count;
    static DEFINE_MUTEX(rcu_barrier_mutex);
    @@ -60,7 +54,7 @@ static struct completion rcu_barrier_com
    * Awaken the corresponding synchronize_rcu() instance now that a
    * grace period has elapsed.
    */
    -static void wakeme_after_rcu(struct rcu_head *head)
    +void wakeme_after_rcu(struct rcu_head *head)
    {
    struct rcu_synchronize *rcu;

    @@ -77,17 +71,7 @@ static void wakeme_after_rcu(struct rcu_
    * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
    * and may be nested.
    */
    -void synchronize_rcu(void)
    -{
    - struct rcu_synchronize rcu;
    -
    - init_completion(&rcu.completion);
    - /* Will wake me after RCU finished */
    - call_rcu(&rcu.head, wakeme_after_rcu);
    -
    - /* Wait for it */
    - wait_for_completion(&rcu.completion);
    -}
    +synchronize_rcu_xxx(synchronize_rcu, call_rcu)
    EXPORT_SYMBOL_GPL(synchronize_rcu);

    static void rcu_barrier_callback(struct rcu_head *notused)
    diff -urpNa -X dontdiff linux-2.6.25-rc6/kernel/rcupreempt.c linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c
    --- linux-2.6.25-rc6/kernel/rcupreempt.c 2008-03-16 17:45:17.000000000 -0700
    +++ linux-2.6.25-rc6-C1-call_rcu_sched/kernel/rcupreempt.c 2008-04-04 18:16:34.000000000 -0700
    @@ -46,6 +46,7 @@
    #include
    #include
    #include
    +#include
    #include
    #include
    #include
    @@ -87,9 +88,14 @@ struct rcu_data {
    struct rcu_head **nexttail;
    struct rcu_head *waitlist[GP_STAGES];
    struct rcu_head **waittail[GP_STAGES];
    - struct rcu_head *donelist;
    + struct rcu_head *donelist; /* from waitlist & waitschedlist */
    struct rcu_head **donetail;
    long rcu_flipctr[2];
    + struct rcu_head *nextschedlist;
    + struct rcu_head **nextschedtail;
    + struct rcu_head *waitschedlist;
    + struct rcu_head **waitschedtail;
    + int rcu_sched_sleeping;
    #ifdef CONFIG_RCU_TRACE
    struct rcupreempt_trace trace;
    #endif /* #ifdef CONFIG_RCU_TRACE */
    @@ -131,11 +137,24 @@ enum rcu_try_flip_states {
    rcu_try_flip_waitmb_state,
    };

    +/*
    + * States for rcu_ctrlblk.rcu_sched_sleep.
    + */
    +
    +enum rcu_sched_sleep_states {
    + rcu_sched_not_sleeping, /* Not sleeping, callbacks need GP. */
    + rcu_sched_sleep_prep, /* Thinking of sleeping, rechecking. */
    + rcu_sched_sleeping, /* Sleeping, awaken if GP needed. */
    +};
    +
    struct rcu_ctrlblk {
    spinlock_t fliplock; /* Protect state-machine transitions. */
    long completed; /* Number of last completed batch. */
    enum rcu_try_flip_states rcu_try_flip_state; /* The current state of
    the rcu state machine */
    + spinlock_t schedlock; /* Protect rcu_sched sleep state. */
    + enum rcu_sched_sleep_states sched_sleep; /* rcu_sched state. */
    + wait_queue_head_t sched_wq; /* Place for rcu_sched to sleep. */
    };

    static DEFINE_PER_CPU(struct rcu_data, rcu_data);
    @@ -143,8 +162,12 @@ static struct rcu_ctrlblk rcu_ctrlblk =
    .fliplock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.fliplock),
    .completed = 0,
    .rcu_try_flip_state = rcu_try_flip_idle_state,
    + .schedlock = __SPIN_LOCK_UNLOCKED(rcu_ctrlblk.schedlock),
    + .sched_sleep = rcu_sched_not_sleeping,
    + .sched_wq = __WAIT_QUEUE_HEAD_INITIALIZER(rcu_ctrlblk.sched_wq ),
    };

    +static struct task_struct *rcu_sched_grace_period_task;

    #ifdef CONFIG_RCU_TRACE
    static char *rcu_try_flip_state_names[] =
    @@ -413,32 +436,34 @@ static void __rcu_advance_callbacks(stru
    }
    }

    -#ifdef CONFIG_NO_HZ
    +DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_dyntick_sched, rcu_dyntick_sched) = {
    + .dynticks = 1,
    +};

    -DEFINE_PER_CPU(long, dynticks_progress_counter) = 1;
    -static DEFINE_PER_CPU(long, rcu_dyntick_snapshot);
    +#ifdef CONFIG_NO_HZ
    static DEFINE_PER_CPU(int, rcu_update_flag);

    /**
    * rcu_irq_enter - Called from Hard irq handlers and NMI/SMI.
    *
    * If the CPU was idle with dynamic ticks active, this updates the
    - * dynticks_progress_counter to let the RCU handling know that the
    + * rcu_dyntick_sched.dynticks to let the RCU handling know that the
    * CPU is active.
    */
    void rcu_irq_enter(void)
    {
    int cpu = smp_processor_id();
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);

    if (per_cpu(rcu_update_flag, cpu))
    per_cpu(rcu_update_flag, cpu)++;

    /*
    * Only update if we are coming from a stopped ticks mode
    - * (dynticks_progress_counter is even).
    + * (rcu_dyntick_sched.dynticks is even).
    */
    if (!in_interrupt() &&
    - (per_cpu(dynticks_progress_counter, cpu) & 0x1) == 0) {
    + (rdssp->dynticks & 0x1) == 0) {
    /*
    * The following might seem like we could have a race
    * with NMI/SMIs. But this really isn't a problem.
    @@ -461,12 +486,12 @@ void rcu_irq_enter(void)
    * RCU read-side critical sections on this CPU would
    * have already completed.
    */
    - per_cpu(dynticks_progress_counter, cpu)++;
    + rdssp->dynticks++;
    /*
    * The following memory barrier ensures that any
    * rcu_read_lock() primitives in the irq handler
    * are seen by other CPUs to follow the above
    - * increment to dynticks_progress_counter. This is
    + * increment to rcu_dyntick_sched.dynticks. This is
    * required in order for other CPUs to correctly
    * determine when it is safe to advance the RCU
    * grace-period state machine.
    @@ -474,7 +499,7 @@ void rcu_irq_enter(void)
    smp_mb(); /* see above block comment. */
    /*
    * Since we can't determine the dynamic tick mode from
    - * the dynticks_progress_counter after this routine,
    + * the rcu_dyntick_sched.dynticks after this routine,
    * we use a second flag to acknowledge that we came
    * from an idle state with ticks stopped.
    */
    @@ -482,7 +507,7 @@ void rcu_irq_enter(void)
    /*
    * If we take an NMI/SMI now, they will also increment
    * the rcu_update_flag, and will not update the
    - * dynticks_progress_counter on exit. That is for
    + * rcu_dyntick_sched.dynticks on exit. That is for
    * this IRQ to do.
    */
    }
    @@ -492,12 +517,13 @@ void rcu_irq_enter(void)
    * rcu_irq_exit - Called from exiting Hard irq context.
    *
    * If the CPU was idle with dynamic ticks active, update the
    - * dynticks_progress_counter to put let the RCU handling be
    + * rcu_dyntick_sched.dynticks to put let the RCU handling be
    * aware that the CPU is going back to idle with no ticks.
    */
    void rcu_irq_exit(void)
    {
    int cpu = smp_processor_id();
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);

    /*
    * rcu_update_flag is set if we interrupted the CPU
    @@ -505,7 +531,7 @@ void rcu_irq_exit(void)
    * Once this occurs, we keep track of interrupt nesting
    * because a NMI/SMI could also come in, and we still
    * only want the IRQ that started the increment of the
    - * dynticks_progress_counter to be the one that modifies
    + * rcu_dyntick_sched.dynticks to be the one that modifies
    * it on exit.
    */
    if (per_cpu(rcu_update_flag, cpu)) {
    @@ -517,28 +543,29 @@ void rcu_irq_exit(void)

    /*
    * If an NMI/SMI happens now we are still
    - * protected by the dynticks_progress_counter being odd.
    + * protected by the rcu_dyntick_sched.dynticks being odd.
    */

    /*
    * The following memory barrier ensures that any
    * rcu_read_unlock() primitives in the irq handler
    * are seen by other CPUs to preceed the following
    - * increment to dynticks_progress_counter. This
    + * increment to rcu_dyntick_sched.dynticks. This
    * is required in order for other CPUs to determine
    * when it is safe to advance the RCU grace-period
    * state machine.
    */
    smp_mb(); /* see above block comment. */
    - per_cpu(dynticks_progress_counter, cpu)++;
    - WARN_ON(per_cpu(dynticks_progress_counter, cpu) & 0x1);
    + rdssp->dynticks++;
    + WARN_ON(rdssp->dynticks & 0x1);
    }
    }

    static void dyntick_save_progress_counter(int cpu)
    {
    - per_cpu(rcu_dyntick_snapshot, cpu) =
    - per_cpu(dynticks_progress_counter, cpu);
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + rdssp->dynticks_snap = rdssp->dynticks;
    }

    static inline int
    @@ -546,9 +573,10 @@ rcu_try_flip_waitack_needed(int cpu)
    {
    long curr;
    long snap;
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);

    - curr = per_cpu(dynticks_progress_counter, cpu);
    - snap = per_cpu(rcu_dyntick_snapshot, cpu);
    + curr = rdssp->dynticks;
    + snap = rdssp->dynticks_snap;
    smp_mb(); /* force ordering with cpu entering/leaving dynticks. */

    /*
    @@ -582,9 +610,10 @@ rcu_try_flip_waitmb_needed(int cpu)
    {
    long curr;
    long snap;
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);

    - curr = per_cpu(dynticks_progress_counter, cpu);
    - snap = per_cpu(rcu_dyntick_snapshot, cpu);
    + curr = rdssp->dynticks;
    + snap = rdssp->dynticks_snap;
    smp_mb(); /* force ordering with cpu entering/leaving dynticks. */

    /*
    @@ -611,14 +640,88 @@ rcu_try_flip_waitmb_needed(int cpu)
    return 1;
    }

    +static void dyntick_save_progress_counter_sched(int cpu)
    +{
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + rdssp->sched_dynticks_snap = rdssp->dynticks;
    +}
    +
    +static inline int
    +rcu_qsctr_inc_needed_dyntick(int cpu)
    +{
    + long curr;
    + long snap;
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + curr = rdssp->dynticks;
    + snap = rdssp->sched_dynticks_snap;
    + smp_mb(); /* force ordering with cpu entering/leaving dynticks. */
    +
    + /*
    + * If the CPU remained in dynticks mode for the entire time
    + * and didn't take any interrupts, NMIs, SMIs, or whatever,
    + * then it cannot be in the middle of an rcu_read_lock(), so
    + * the next rcu_read_lock() it executes must use the new value
    + * of the counter. Therefore, this CPU has been in a quiescent
    + * state the entire time, and we don't need to wait for it.
    + */
    +
    + if ((curr == snap) && ((curr & 0x1) == 0))
    + return 0;
    +
    + /*
    + * If the CPU passed through or entered a dynticks idle phase with
    + * no active irq handlers, then, as above, this CPU has already
    + * passed through a quiescent state.
    + */
    +
    + if ((curr - snap) > 2 || (snap & 0x1) == 0)
    + return 0;
    +
    + /* We need this CPU to go through a quiescent state. */
    +
    + return 1;
    +}
    +
    #else /* !CONFIG_NO_HZ */

    -# define dyntick_save_progress_counter(cpu) do { } while (0)
    -# define rcu_try_flip_waitack_needed(cpu) (1)
    -# define rcu_try_flip_waitmb_needed(cpu) (1)
    +# define dyntick_save_progress_counter(cpu) do { } while (0)
    +# define rcu_try_flip_waitack_needed(cpu) (1)
    +# define rcu_try_flip_waitmb_needed(cpu) (1)
    +
    +# define dyntick_save_progress_counter_sched(cpu) do { } while (0)
    +# define rcu_qsctr_inc_needed_dyntick(cpu) (1)

    #endif /* CONFIG_NO_HZ */

    +static void save_qsctr_sched(int cpu)
    +{
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + rdssp->sched_qs_snap = rdssp->sched_qs;
    +}
    +
    +static inline int
    +rcu_qsctr_inc_needed(int cpu)
    +{
    + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    +
    + /*
    + * If there has been a quiescent state, no more need to wait
    + * on this CPU.
    + */
    +
    + if (rdssp->sched_qs != rdssp->sched_qs_snap) {
    + smp_mb(); /* force ordering with cpu entering schedule(). */
    + return 0;
    + }
    +
    + /* We need this CPU to go through a quiescent state. */
    +
    + return 1;
    +}
    +
    /*
    * Get here when RCU is idle. Decide whether we need to
    * move out of idle state, and return non-zero if so.
    @@ -821,6 +924,13 @@ void rcu_check_callbacks(int cpu, int us
    unsigned long flags;
    struct rcu_data *rdp = RCU_DATA_CPU(cpu);

    + if (user ||
    + (idle_cpu(cpu) && !in_softirq() &&
    + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
    + smp_mb(); /* Guard against aggressive schedule(). */
    + rcu_qsctr_inc(cpu);
    + }
    +
    rcu_check_mb(cpu);
    if (rcu_ctrlblk.completed == rdp->completed)
    rcu_try_flip();
    @@ -871,6 +981,8 @@ void rcu_offline_cpu(int cpu)
    struct rcu_head *list = NULL;
    unsigned long flags;
    struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    + struct rcu_head *schedlist = NULL;
    + struct rcu_head **schedtail = &schedlist;
    struct rcu_head **tail = &list;

    /*
    @@ -884,6 +996,11 @@ void rcu_offline_cpu(int cpu)
    rcu_offline_cpu_enqueue(rdp->waitlist[i], rdp->waittail[i],
    list, tail);
    rcu_offline_cpu_enqueue(rdp->nextlist, rdp->nexttail, list, tail);
    + rcu_offline_cpu_enqueue(rdp->waitschedlist, rdp->waitschedtail,
    + schedlist, schedtail);
    + rcu_offline_cpu_enqueue(rdp->nextschedlist, rdp->nextschedtail,
    + schedlist, schedtail);
    + rdp->rcu_sched_sleeping = 0;
    spin_unlock_irqrestore(&rdp->lock, flags);
    rdp->waitlistcount = 0;

    @@ -924,16 +1041,35 @@ void rcu_offline_cpu(int cpu)
    *rdp->nexttail = list;
    if (list)
    rdp->nexttail = tail;
    + *rdp->nextschedtail = schedlist;
    + if (schedlist)
    + rdp->nextschedtail = schedtail;
    spin_unlock_irqrestore(&rdp->lock, flags);
    }

    void __devinit rcu_online_cpu(int cpu)
    {
    unsigned long flags;
    + struct rcu_data *rdp;

    spin_lock_irqsave(&rcu_ctrlblk.fliplock, flags);
    cpu_set(cpu, rcu_cpu_online_map);
    spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, flags);
    +
    + /*
    + * The rcu_sched grace-period processing might have bypassed
    + * this CPU, given that it was not in the rcu_cpu_online_map
    + * when the grace-period scan started. This means that the
    + * grace-period task might sleep. So make sure that if this
    + * should happen, the first callback posted to this CPU will
    + * wake up the grace-period task if need be.
    + */
    +
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + rdp->rcu_sched_sleeping = 1;
    + spin_unlock_irqrestore(&rdp->lock, flags);
    }

    #else /* #ifdef CONFIG_HOTPLUG_CPU */
    @@ -993,26 +1129,194 @@ void call_rcu(struct rcu_head *head, voi
    }
    EXPORT_SYMBOL_GPL(call_rcu);

    +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    +{
    + unsigned long flags;
    + struct rcu_data *rdp;
    + int wake_gp = 0;
    +
    + head->func = func;
    + head->next = NULL;
    + local_irq_save(flags);
    + rdp = RCU_DATA_ME();
    + spin_lock(&rdp->lock);
    + *rdp->nextschedtail = head;
    + rdp->nextschedtail = &head->next;
    + if (rdp->rcu_sched_sleeping) {
    +
    + /* Grace-period processing might be sleeping... */
    +
    + rdp->rcu_sched_sleeping = 0;
    + wake_gp = 1;
    + }
    + spin_unlock(&rdp->lock);
    + local_irq_restore(flags);
    + if (wake_gp) {
    +
    + /* Wake up grace-period processing, unless someone beat us. */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    + wake_gp = 0;
    + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + if (wake_gp)
    + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    + }
    +}
    +EXPORT_SYMBOL_GPL(call_rcu_sched);
    +
    /*
    * Wait until all currently running preempt_disable() code segments
    * (including hardware-irq-disable segments) complete. Note that
    * in -rt this does -not- necessarily result in all currently executing
    * interrupt -handlers- having completed.
    */
    -void __synchronize_sched(void)
    +synchronize_rcu_xxx(__synchronize_sched, call_rcu_sched)
    +EXPORT_SYMBOL_GPL(__synchronize_sched);
    +
    +/*
    + * kthread function that manages call_rcu_sched grace periods.
    + */
    +static int
    +rcu_sched_grace_period(void *arg)
    {
    - cpumask_t oldmask;
    + int couldsleep; /* might sleep after current pass. */
    + int couldsleepnext = 0; /* might sleep after next pass. */
    int cpu;
    + unsigned long flags;
    + struct rcu_data *rdp;
    + int ret;

    - if (sched_getaffinity(0, &oldmask) < 0)
    - oldmask = cpu_possible_map;
    - for_each_online_cpu(cpu) {
    - sched_setaffinity(0, cpumask_of_cpu(cpu));
    - schedule();
    - }
    - sched_setaffinity(0, oldmask);
    + /*
    + * Each pass through the following loop handles one
    + * rcu_sched grace period cycle.
    + */
    +
    + do {
    +
    + /* Save each CPU's current state. */
    +
    + for_each_online_cpu(cpu) {
    + dyntick_save_progress_counter_sched(cpu);
    + save_qsctr_sched(cpu);
    + }
    +
    + /*
    + * Sleep for about an RCU grace-period's worth to
    + * allow better batching and to consume less CPU.
    + */
    +
    + schedule_timeout_interruptible(HZ / 20);
    +
    + /*
    + * If there was nothing to do last time, prepare to
    + * sleep at the end of the current grace period cycle.
    + */
    +
    + couldsleep = couldsleepnext;
    + couldsleepnext = 1;
    + if (couldsleep) {
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + }
    +
    + /*
    + * Wait on each CPU in turn to have either visited
    + * a quiescent state or been in dynticks-idle mode.
    + */
    +
    + for_each_online_cpu(cpu) {
    + while (rcu_qsctr_inc_needed(cpu) &&
    + rcu_qsctr_inc_needed_dyntick(cpu)) {
    + /* resched_cpu(cpu); */
    + schedule_timeout_interruptible(1);
    + }
    + }
    +
    + /*
    + * Advance callbacks for each CPU.
    + */
    +
    + for_each_online_cpu(cpu) {
    +
    + rdp = RCU_DATA_CPU(cpu);
    + spin_lock_irqsave(&rdp->lock, flags);
    +
    + /*
    + * We are running on this CPU irq-disabled, so no
    + * CPU can go offline until we re-enable irqs.
    + *
    + * Advance the callbacks! We share normal RCU's
    + * donelist, since callbacks are invoked the
    + * same way in either case.
    + */
    +
    + if (rdp->waitschedlist != NULL) {
    + *rdp->donetail = rdp->waitschedlist;
    + rdp->donetail = rdp->waitschedtail;
    +
    + /*
    + * Next rcu_check_callbacks() will
    + * do the required raise_softirq().
    + */
    + }
    + if (rdp->nextschedlist != NULL) {
    + rdp->waitschedlist = rdp->nextschedlist;
    + rdp->waitschedtail = rdp->nextschedtail;
    + couldsleep = 0;
    + couldsleepnext = 0;
    + } else {
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + }
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    +
    + /* Mark sleep intention. */
    +
    + rdp->rcu_sched_sleeping = couldsleep;
    +
    + spin_unlock_irqrestore(&rdp->lock, flags);
    + }
    +
    + /* If we saw callbacks on the last scan, go deal with them. */
    +
    + if (!couldsleep)
    + continue;
    +
    + /* Attempt to block... */
    +
    + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    +
    + /*
    + * Someone posted a callback after we scanned.
    + * Go take care of it.
    + */
    +
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + couldsleepnext = 0;
    + continue;
    + }
    +
    + /* Block until the next person posts a callback. */
    +
    + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    + ret = 0;
    + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    + ret);
    + if (ret)
    + flush_signals(current);
    + couldsleepnext = 0;
    +
    + } while (!kthread_should_stop());
    +
    + return (0);
    }
    -EXPORT_SYMBOL_GPL(__synchronize_sched);

    /*
    * Check to see if any future RCU-related work will need to be done
    @@ -1029,7 +1333,9 @@ int rcu_needs_cpu(int cpu)

    return (rdp->donelist != NULL ||
    !!rdp->waitlistcount ||
    - rdp->nextlist != NULL);
    + rdp->nextlist != NULL ||
    + rdp->nextschedlist != NULL ||
    + rdp->waitschedlist != NULL);
    }

    int rcu_pending(int cpu)
    @@ -1040,7 +1346,9 @@ int rcu_pending(int cpu)

    if (rdp->donelist != NULL ||
    !!rdp->waitlistcount ||
    - rdp->nextlist != NULL)
    + rdp->nextlist != NULL ||
    + rdp->nextschedlist != NULL ||
    + rdp->waitschedlist != NULL)
    return 1;

    /* The RCU core needs an acknowledgement from this CPU. */
    @@ -1107,6 +1415,11 @@ void __init __rcu_init(void)
    rdp->donetail = &rdp->donelist;
    rdp->rcu_flipctr[0] = 0;
    rdp->rcu_flipctr[1] = 0;
    + rdp->nextschedlist = NULL;
    + rdp->nextschedtail = &rdp->nextschedlist;
    + rdp->waitschedlist = NULL;
    + rdp->waitschedtail = &rdp->waitschedlist;
    + rdp->rcu_sched_sleeping = 0;
    }
    register_cpu_notifier(&rcu_nb);

    @@ -1129,6 +1442,18 @@ void __init __rcu_init(void)
    }

    /*
    + * Late-boot-time RCU initialization that must wait until after scheduler
    + * has been initialized.
    + */
    +void __init rcu_init_sched(void)
    +{
    + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    + NULL,
    + "rcu_sched_grace_period");
    + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    +}
    +
    +/*
    * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    */
    void synchronize_kernel(void)
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  9. Re: [PATCH,RFC] Add call_rcu_sched()

    On Sun, 6 Apr 2008 14:37:19 -0700 "Paul E. McKenney" wrote:

    > Hello!
    >
    > Third cut of patch to provide the call_rcu_sched(). This is again to
    > synchronize_sched() as call_rcu() is to synchronize_rcu().
    >
    > Should be fine for experimental use, but not ready for inclusion.


    Let me know when to come out of hiding

    > Passes multi-hour rcutorture sessions with concurrent CPU hotplugging.
    >
    > Fixes since the first version include a bug that could result in
    > indefinite blocking (spotted by Gautham Shenoy), better resiliency
    > against CPU-hotplug operations, and other minor fixes.
    >
    > Fixes since the second version include reworking grace-period detection
    > to avoid deadlocks that could happen when running concurrently with
    > CPU hotplug, adding Mathieu's fix to avoid the softlockup messages,
    > as well as Mathieu's fix to allow use earlier in boot.
    >
    > Known/suspected shortcomings:
    >
    > o Only moderately tested on x86-64 and POWER -- a few hours of
    > rcutorture with concurrent CPU hotplugging. In particular, I
    > still do not trust the sleep/wakeup logic between call_rcu_sched()
    > and rcu_sched_grace_period().
    >
    > o Need to add call_rcu_sched() testing to rcutorture.
    >
    > o Still needs rcu_barrier_sched() -- intending to incorporate
    > the version Mathieu provided.
    >
    > This patch also fixes a long-standing bug in the earlier preemptable-RCU
    > implementation of synchronize_rcu() that could result in loss of
    > concurrent external changes to a task's CPU affinity mask. I still cannot
    > remember who reported this...
    >
    > ...
    >
    > +#define call_rcu_sched(head, func) call_rcu(head, func)
    > +
    > extern void __rcu_init(void);
    > +#define rcu_init_sched() do { } while (0)


    There are lots of creepy macros-which-probably-dont-need-to-be-macros in
    here.

    > +
    > +static inline int
    > +rcu_qsctr_inc_needed_dyntick(int cpu)


    Unneeded newline.

    > +{
    > + long curr;
    > + long snap;
    > + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    > +
    > + curr = rdssp->dynticks;
    > + snap = rdssp->sched_dynticks_snap;
    > + smp_mb(); /* force ordering with cpu entering/leaving dynticks. */
    > +
    > + /*
    > + * If the CPU remained in dynticks mode for the entire time
    > + * and didn't take any interrupts, NMIs, SMIs, or whatever,
    > + * then it cannot be in the middle of an rcu_read_lock(), so
    > + * the next rcu_read_lock() it executes must use the new value
    > + * of the counter. Therefore, this CPU has been in a quiescent
    > + * state the entire time, and we don't need to wait for it.
    > + */
    > +
    > + if ((curr == snap) && ((curr & 0x1) == 0))
    > + return 0;
    > +
    > + /*
    > + * If the CPU passed through or entered a dynticks idle phase with
    > + * no active irq handlers, then, as above, this CPU has already
    > + * passed through a quiescent state.
    > + */
    > +
    > + if ((curr - snap) > 2 || (snap & 0x1) == 0)
    > + return 0;
    > +
    > + /* We need this CPU to go through a quiescent state. */
    > +
    > + return 1;
    > +}


    That's a pretty big inline. It only has a single callsite so the compiler
    should inline it for us. And if it grows a second callsite, the inlining
    is probably wrong.

    > +static inline int
    > +rcu_qsctr_inc_needed(int cpu)


    Unneeded newline.

    > /*
    > * Get here when RCU is idle. Decide whether we need to
    > * move out of idle state, and return non-zero if so.
    > @@ -821,6 +924,13 @@ void rcu_check_callbacks(int cpu, int us
    > unsigned long flags;
    > struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    >
    > + if (user ||
    > + (idle_cpu(cpu) && !in_softirq() &&
    > + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {


    I think this test could do with a bigfatcomment explaining what it is doing.

    > + smp_mb(); /* Guard against aggressive schedule(). */
    > + rcu_qsctr_inc(cpu);
    > + }
    > +
    > rcu_check_mb(cpu);
    > if (rcu_ctrlblk.completed == rdp->completed)
    > rcu_try_flip();
    >
    > ...
    >
    > +
    > + /*
    > + * The rcu_sched grace-period processing might have bypassed
    > + * this CPU, given that it was not in the rcu_cpu_online_map
    > + * when the grace-period scan started. This means that the
    > + * grace-period task might sleep. So make sure that if this
    > + * should happen, the first callback posted to this CPU will
    > + * wake up the grace-period task if need be.
    > + */
    > +
    > + local_irq_save(flags);
    > + rdp = RCU_DATA_ME();
    > + spin_lock(&rdp->lock);


    I assume that splitting the irq-disable from the spin_lock is a little
    latency optimisation?

    > + rdp->rcu_sched_sleeping = 1;
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > }
    >
    > #else /* #ifdef CONFIG_HOTPLUG_CPU */
    > @@ -993,26 +1129,194 @@ void call_rcu(struct rcu_head *head, voi
    > }
    > EXPORT_SYMBOL_GPL(call_rcu);
    >
    > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    > +{
    > + unsigned long flags;
    > + struct rcu_data *rdp;
    > + int wake_gp = 0;
    > +
    > + head->func = func;
    > + head->next = NULL;
    > + local_irq_save(flags);
    > + rdp = RCU_DATA_ME();
    > + spin_lock(&rdp->lock);
    > + *rdp->nextschedtail = head;
    > + rdp->nextschedtail = &head->next;
    > + if (rdp->rcu_sched_sleeping) {
    > +
    > + /* Grace-period processing might be sleeping... */
    > +
    > + rdp->rcu_sched_sleeping = 0;
    > + wake_gp = 1;
    > + }
    > + spin_unlock(&rdp->lock);
    > + local_irq_restore(flags);


    spin_unlock_irqrestore() here would be consistent with the above.

    > + if (wake_gp) {
    > +
    > + /* Wake up grace-period processing, unless someone beat us. */
    > +
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);


    If wake_gp!=0 is common then we could microoptimise straight-line
    performance here by retaining the irq-offness from above.

    > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    > + wake_gp = 0;
    > + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + if (wake_gp)
    > + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    > + }
    > +}
    > +EXPORT_SYMBOL_GPL(call_rcu_sched);
    >
    > ...
    >
    > +static int
    > +rcu_sched_grace_period(void *arg)


    Unneeded newline.

    > {
    > - cpumask_t oldmask;
    > + int couldsleep; /* might sleep after current pass. */
    > + int couldsleepnext = 0; /* might sleep after next pass. */
    > int cpu;
    > + unsigned long flags;
    > + struct rcu_data *rdp;
    > + int ret;
    >
    > - if (sched_getaffinity(0, &oldmask) < 0)
    > - oldmask = cpu_possible_map;
    > - for_each_online_cpu(cpu) {
    > - sched_setaffinity(0, cpumask_of_cpu(cpu));
    > - schedule();
    > - }
    > - sched_setaffinity(0, oldmask);
    > + /*
    > + * Each pass through the following loop handles one
    > + * rcu_sched grace period cycle.
    > + */
    > +
    > + do {
    > +
    > + /* Save each CPU's current state. */
    > +
    > + for_each_online_cpu(cpu) {


    Numerous unneeded newline

    > + dyntick_save_progress_counter_sched(cpu);
    > + save_qsctr_sched(cpu);
    > + }
    > +
    > + /*
    > + * Sleep for about an RCU grace-period's worth to
    > + * allow better batching and to consume less CPU.
    > + */
    > +
    > + schedule_timeout_interruptible(HZ / 20);


    eek, a magic number.

    > + /*
    > + * If there was nothing to do last time, prepare to
    > + * sleep at the end of the current grace period cycle.
    > + */
    > +
    > + couldsleep = couldsleepnext;
    > + couldsleepnext = 1;
    > + if (couldsleep) {
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + }


    If the above locking actually correct and needed? The write to
    rcu_ctrlblk.sched_sleep is a single word...

    > + /*
    > + * Wait on each CPU in turn to have either visited
    > + * a quiescent state or been in dynticks-idle mode.
    > + */
    > +
    > + for_each_online_cpu(cpu) {
    > + while (rcu_qsctr_inc_needed(cpu) &&
    > + rcu_qsctr_inc_needed_dyntick(cpu)) {
    > + /* resched_cpu(cpu); */
    > + schedule_timeout_interruptible(1);
    > + }
    > + }
    > +
    > + /*
    > + * Advance callbacks for each CPU.
    > + */
    > +
    > + for_each_online_cpu(cpu) {


    It's more conventional to omit the blank line after the above form of
    comment block.

    > + rdp = RCU_DATA_CPU(cpu);
    > + spin_lock_irqsave(&rdp->lock, flags);
    > +
    > + /*
    > + * We are running on this CPU irq-disabled, so no
    > + * CPU can go offline until we re-enable irqs.


    but, but, but. The cpu at `cpu' could have gone offline just before we
    disabled local interrupts.

    > + * Advance the callbacks! We share normal RCU's
    > + * donelist, since callbacks are invoked the
    > + * same way in either case.
    > + */
    > +
    > + if (rdp->waitschedlist != NULL) {
    > + *rdp->donetail = rdp->waitschedlist;
    > + rdp->donetail = rdp->waitschedtail;
    > +
    > + /*
    > + * Next rcu_check_callbacks() will
    > + * do the required raise_softirq().
    > + */
    > + }
    > + if (rdp->nextschedlist != NULL) {
    > + rdp->waitschedlist = rdp->nextschedlist;
    > + rdp->waitschedtail = rdp->nextschedtail;
    > + couldsleep = 0;
    > + couldsleepnext = 0;
    > + } else {
    > + rdp->waitschedlist = NULL;
    > + rdp->waitschedtail = &rdp->waitschedlist;
    > + }
    > + rdp->nextschedlist = NULL;
    > + rdp->nextschedtail = &rdp->nextschedlist;
    > +
    > + /* Mark sleep intention. */
    > +
    > + rdp->rcu_sched_sleeping = couldsleep;
    > +
    > + spin_unlock_irqrestore(&rdp->lock, flags);
    > + }
    > +
    > + /* If we saw callbacks on the last scan, go deal with them. */
    > +
    > + if (!couldsleep)
    > + continue;
    > +
    > + /* Attempt to block... */
    > +
    > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    > +
    > + /*
    > + * Someone posted a callback after we scanned.
    > + * Go take care of it.
    > + */
    > +
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + couldsleepnext = 0;
    > + continue;
    > + }
    > +
    > + /* Block until the next person posts a callback. */
    > +
    > + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > + ret = 0;
    > + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    > + ret);
    > + if (ret)
    > + flush_signals(current);


    That flush_signals() was a surprise. A desurprising comment would be nice.

    > + couldsleepnext = 0;
    > +
    > + } while (!kthread_should_stop());
    > +
    > + return (0);
    > }
    > -EXPORT_SYMBOL_GPL(__synchronize_sched);
    >
    > /*
    > * Check to see if any future RCU-related work will need to be done
    > @@ -1029,7 +1333,9 @@ int rcu_needs_cpu(int cpu)
    >
    > return (rdp->donelist != NULL ||
    > !!rdp->waitlistcount ||
    > - rdp->nextlist != NULL);
    > + rdp->nextlist != NULL ||
    > + rdp->nextschedlist != NULL ||
    > + rdp->waitschedlist != NULL);
    > }
    >
    > int rcu_pending(int cpu)
    > @@ -1040,7 +1346,9 @@ int rcu_pending(int cpu)
    >
    > if (rdp->donelist != NULL ||
    > !!rdp->waitlistcount ||
    > - rdp->nextlist != NULL)
    > + rdp->nextlist != NULL ||
    > + rdp->nextschedlist != NULL ||
    > + rdp->waitschedlist != NULL)
    > return 1;
    >
    > /* The RCU core needs an acknowledgement from this CPU. */
    > @@ -1107,6 +1415,11 @@ void __init __rcu_init(void)
    > rdp->donetail = &rdp->donelist;
    > rdp->rcu_flipctr[0] = 0;
    > rdp->rcu_flipctr[1] = 0;
    > + rdp->nextschedlist = NULL;
    > + rdp->nextschedtail = &rdp->nextschedlist;
    > + rdp->waitschedlist = NULL;
    > + rdp->waitschedtail = &rdp->waitschedlist;
    > + rdp->rcu_sched_sleeping = 0;
    > }
    > register_cpu_notifier(&rcu_nb);
    >
    > @@ -1129,6 +1442,18 @@ void __init __rcu_init(void)
    > }
    >
    > /*
    > + * Late-boot-time RCU initialization that must wait until after scheduler
    > + * has been initialized.
    > + */
    > +void __init rcu_init_sched(void)
    > +{
    > + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    > + NULL,
    > + "rcu_sched_grace_period");
    > + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    > +}
    > +
    > +/*
    > * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    > */
    > void synchronize_kernel(void)


    I suspect I don't understand any of the RCU code any more.

    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  10. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, Apr 08, 2008 at 12:34:49AM -0700, Andrew Morton wrote:
    > On Sun, 6 Apr 2008 14:37:19 -0700 "Paul E. McKenney" wrote:
    >
    > > Hello!
    > >
    > > Third cut of patch to provide the call_rcu_sched(). This is again to
    > > synchronize_sched() as call_rcu() is to synchronize_rcu().
    > >
    > > Should be fine for experimental use, but not ready for inclusion.

    >
    > Let me know when to come out of hiding
    >
    > > Passes multi-hour rcutorture sessions with concurrent CPU hotplugging.
    > >
    > > Fixes since the first version include a bug that could result in
    > > indefinite blocking (spotted by Gautham Shenoy), better resiliency
    > > against CPU-hotplug operations, and other minor fixes.
    > >
    > > Fixes since the second version include reworking grace-period detection
    > > to avoid deadlocks that could happen when running concurrently with
    > > CPU hotplug, adding Mathieu's fix to avoid the softlockup messages,
    > > as well as Mathieu's fix to allow use earlier in boot.
    > >
    > > Known/suspected shortcomings:
    > >
    > > o Only moderately tested on x86-64 and POWER -- a few hours of
    > > rcutorture with concurrent CPU hotplugging. In particular, I
    > > still do not trust the sleep/wakeup logic between call_rcu_sched()
    > > and rcu_sched_grace_period().
    > >
    > > o Need to add call_rcu_sched() testing to rcutorture.
    > >
    > > o Still needs rcu_barrier_sched() -- intending to incorporate
    > > the version Mathieu provided.
    > >
    > > This patch also fixes a long-standing bug in the earlier preemptable-RCU
    > > implementation of synchronize_rcu() that could result in loss of
    > > concurrent external changes to a task's CPU affinity mask. I still cannot
    > > remember who reported this...
    > >
    > > ...
    > >
    > > +#define call_rcu_sched(head, func) call_rcu(head, func)
    > > +
    > > extern void __rcu_init(void);
    > > +#define rcu_init_sched() do { } while (0)

    >
    > There are lots of creepy macros-which-probably-dont-need-to-be-macros in
    > here.
    >
    > > +
    > > +static inline int
    > > +rcu_qsctr_inc_needed_dyntick(int cpu)

    >
    > Unneeded newline.
    >
    > > +{
    > > + long curr;
    > > + long snap;
    > > + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    > > +
    > > + curr = rdssp->dynticks;
    > > + snap = rdssp->sched_dynticks_snap;
    > > + smp_mb(); /* force ordering with cpu entering/leaving dynticks. */
    > > +
    > > + /*
    > > + * If the CPU remained in dynticks mode for the entire time
    > > + * and didn't take any interrupts, NMIs, SMIs, or whatever,
    > > + * then it cannot be in the middle of an rcu_read_lock(), so
    > > + * the next rcu_read_lock() it executes must use the new value
    > > + * of the counter. Therefore, this CPU has been in a quiescent
    > > + * state the entire time, and we don't need to wait for it.
    > > + */
    > > +
    > > + if ((curr == snap) && ((curr & 0x1) == 0))
    > > + return 0;
    > > +
    > > + /*
    > > + * If the CPU passed through or entered a dynticks idle phase with
    > > + * no active irq handlers, then, as above, this CPU has already
    > > + * passed through a quiescent state.
    > > + */
    > > +
    > > + if ((curr - snap) > 2 || (snap & 0x1) == 0)
    > > + return 0;
    > > +
    > > + /* We need this CPU to go through a quiescent state. */
    > > +
    > > + return 1;
    > > +}

    >
    > That's a pretty big inline. It only has a single callsite so the compiler
    > should inline it for us. And if it grows a second callsite, the inlining
    > is probably wrong.
    >
    > > +static inline int
    > > +rcu_qsctr_inc_needed(int cpu)

    >
    > Unneeded newline.
    >
    > > /*
    > > * Get here when RCU is idle. Decide whether we need to
    > > * move out of idle state, and return non-zero if so.
    > > @@ -821,6 +924,13 @@ void rcu_check_callbacks(int cpu, int us
    > > unsigned long flags;
    > > struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    > >
    > > + if (user ||
    > > + (idle_cpu(cpu) && !in_softirq() &&
    > > + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {

    >
    > I think this test could do with a bigfatcomment explaining what it is doing.
    >
    > > + smp_mb(); /* Guard against aggressive schedule(). */
    > > + rcu_qsctr_inc(cpu);
    > > + }
    > > +
    > > rcu_check_mb(cpu);
    > > if (rcu_ctrlblk.completed == rdp->completed)
    > > rcu_try_flip();
    > >
    > > ...
    > >
    > > +
    > > + /*
    > > + * The rcu_sched grace-period processing might have bypassed
    > > + * this CPU, given that it was not in the rcu_cpu_online_map
    > > + * when the grace-period scan started. This means that the
    > > + * grace-period task might sleep. So make sure that if this
    > > + * should happen, the first callback posted to this CPU will
    > > + * wake up the grace-period task if need be.
    > > + */
    > > +
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);

    >
    > I assume that splitting the irq-disable from the spin_lock is a little
    > latency optimisation?


    RCU_DATA_ME() eventually calls smp_processor_id(), which yells if we
    call it from a irq-enabled or a preempt-enabled context. Hence the
    splitting.

    >
    > > + rdp->rcu_sched_sleeping = 1;
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > }
    > >
    > > #else /* #ifdef CONFIG_HOTPLUG_CPU */
    > > @@ -993,26 +1129,194 @@ void call_rcu(struct rcu_head *head, voi
    > > }
    > > EXPORT_SYMBOL_GPL(call_rcu);
    > >
    > > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    > > +{
    > > + unsigned long flags;
    > > + struct rcu_data *rdp;
    > > + int wake_gp = 0;
    > > +
    > > + head->func = func;
    > > + head->next = NULL;
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);
    > > + *rdp->nextschedtail = head;
    > > + rdp->nextschedtail = &head->next;
    > > + if (rdp->rcu_sched_sleeping) {
    > > +
    > > + /* Grace-period processing might be sleeping... */
    > > +
    > > + rdp->rcu_sched_sleeping = 0;
    > > + wake_gp = 1;
    > > + }
    > > + spin_unlock(&rdp->lock);
    > > + local_irq_restore(flags);

    >
    > spin_unlock_irqrestore() here would be consistent with the above.
    >
    > > + if (wake_gp) {
    > > +
    > > + /* Wake up grace-period processing, unless someone beat us. */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);

    >
    > If wake_gp!=0 is common then we could microoptimise straight-line
    > performance here by retaining the irq-offness from above.
    >
    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    > > + wake_gp = 0;
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + if (wake_gp)
    > > + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    > > + }
    > > +}
    > > +EXPORT_SYMBOL_GPL(call_rcu_sched);
    > >
    > > ...
    > >
    > > +static int
    > > +rcu_sched_grace_period(void *arg)

    >
    > Unneeded newline.
    >
    > > {
    > > - cpumask_t oldmask;
    > > + int couldsleep; /* might sleep after current pass. */
    > > + int couldsleepnext = 0; /* might sleep after next pass. */
    > > int cpu;
    > > + unsigned long flags;
    > > + struct rcu_data *rdp;
    > > + int ret;
    > >
    > > - if (sched_getaffinity(0, &oldmask) < 0)
    > > - oldmask = cpu_possible_map;
    > > - for_each_online_cpu(cpu) {
    > > - sched_setaffinity(0, cpumask_of_cpu(cpu));
    > > - schedule();
    > > - }
    > > - sched_setaffinity(0, oldmask);
    > > + /*
    > > + * Each pass through the following loop handles one
    > > + * rcu_sched grace period cycle.
    > > + */
    > > +
    > > + do {
    > > +
    > > + /* Save each CPU's current state. */
    > > +
    > > + for_each_online_cpu(cpu) {

    >
    > Numerous unneeded newline
    >
    > > + dyntick_save_progress_counter_sched(cpu);
    > > + save_qsctr_sched(cpu);
    > > + }
    > > +
    > > + /*
    > > + * Sleep for about an RCU grace-period's worth to
    > > + * allow better batching and to consume less CPU.
    > > + */
    > > +
    > > + schedule_timeout_interruptible(HZ / 20);

    >
    > eek, a magic number.
    >
    > > + /*
    > > + * If there was nothing to do last time, prepare to
    > > + * sleep at the end of the current grace period cycle.
    > > + */
    > > +
    > > + couldsleep = couldsleepnext;
    > > + couldsleepnext = 1;
    > > + if (couldsleep) {
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + }

    >
    > If the above locking actually correct and needed? The write to
    > rcu_ctrlblk.sched_sleep is a single word...
    >
    > > + /*
    > > + * Wait on each CPU in turn to have either visited
    > > + * a quiescent state or been in dynticks-idle mode.
    > > + */
    > > +
    > > + for_each_online_cpu(cpu) {
    > > + while (rcu_qsctr_inc_needed(cpu) &&
    > > + rcu_qsctr_inc_needed_dyntick(cpu)) {
    > > + /* resched_cpu(cpu); */
    > > + schedule_timeout_interruptible(1);
    > > + }
    > > + }
    > > +
    > > + /*
    > > + * Advance callbacks for each CPU.
    > > + */
    > > +
    > > + for_each_online_cpu(cpu) {

    >
    > It's more conventional to omit the blank line after the above form of
    > comment block.
    >
    > > + rdp = RCU_DATA_CPU(cpu);
    > > + spin_lock_irqsave(&rdp->lock, flags);
    > > +
    > > + /*
    > > + * We are running on this CPU irq-disabled, so no
    > > + * CPU can go offline until we re-enable irqs.

    >
    > but, but, but. The cpu at `cpu' could have gone offline just before we
    > disabled local interrupts.


    In that case the CPU_DEAD callback should have migrated the rcu-lists to
    a cpu which is online.

    >
    > > + * Advance the callbacks! We share normal RCU's
    > > + * donelist, since callbacks are invoked the
    > > + * same way in either case.
    > > + */
    > > +
    > > + if (rdp->waitschedlist != NULL) {
    > > + *rdp->donetail = rdp->waitschedlist;
    > > + rdp->donetail = rdp->waitschedtail;
    > > +
    > > + /*
    > > + * Next rcu_check_callbacks() will
    > > + * do the required raise_softirq().
    > > + */
    > > + }
    > > + if (rdp->nextschedlist != NULL) {
    > > + rdp->waitschedlist = rdp->nextschedlist;
    > > + rdp->waitschedtail = rdp->nextschedtail;
    > > + couldsleep = 0;
    > > + couldsleepnext = 0;
    > > + } else {
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + }
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > +
    > > + /* Mark sleep intention. */
    > > +
    > > + rdp->rcu_sched_sleeping = couldsleep;
    > > +
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > + }
    > > +
    > > + /* If we saw callbacks on the last scan, go deal with them. */
    > > +
    > > + if (!couldsleep)
    > > + continue;
    > > +
    > > + /* Attempt to block... */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    > > +
    > > + /*
    > > + * Someone posted a callback after we scanned.
    > > + * Go take care of it.
    > > + */
    > > +
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + couldsleepnext = 0;
    > > + continue;
    > > + }
    > > +
    > > + /* Block until the next person posts a callback. */
    > > +
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + ret = 0;
    > > + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    > > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    > > + ret);
    > > + if (ret)
    > > + flush_signals(current);

    >
    > That flush_signals() was a surprise. A desurprising comment would be nice.
    >
    > > + couldsleepnext = 0;
    > > +
    > > + } while (!kthread_should_stop());
    > > +
    > > + return (0);
    > > }
    > > -EXPORT_SYMBOL_GPL(__synchronize_sched);
    > >
    > > /*
    > > * Check to see if any future RCU-related work will need to be done
    > > @@ -1029,7 +1333,9 @@ int rcu_needs_cpu(int cpu)
    > >
    > > return (rdp->donelist != NULL ||
    > > !!rdp->waitlistcount ||
    > > - rdp->nextlist != NULL);
    > > + rdp->nextlist != NULL ||
    > > + rdp->nextschedlist != NULL ||
    > > + rdp->waitschedlist != NULL);
    > > }
    > >
    > > int rcu_pending(int cpu)
    > > @@ -1040,7 +1346,9 @@ int rcu_pending(int cpu)
    > >
    > > if (rdp->donelist != NULL ||
    > > !!rdp->waitlistcount ||
    > > - rdp->nextlist != NULL)
    > > + rdp->nextlist != NULL ||
    > > + rdp->nextschedlist != NULL ||
    > > + rdp->waitschedlist != NULL)
    > > return 1;
    > >
    > > /* The RCU core needs an acknowledgement from this CPU. */
    > > @@ -1107,6 +1415,11 @@ void __init __rcu_init(void)
    > > rdp->donetail = &rdp->donelist;
    > > rdp->rcu_flipctr[0] = 0;
    > > rdp->rcu_flipctr[1] = 0;
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + rdp->rcu_sched_sleeping = 0;
    > > }
    > > register_cpu_notifier(&rcu_nb);
    > >
    > > @@ -1129,6 +1442,18 @@ void __init __rcu_init(void)
    > > }
    > >
    > > /*
    > > + * Late-boot-time RCU initialization that must wait until after scheduler
    > > + * has been initialized.
    > > + */
    > > +void __init rcu_init_sched(void)
    > > +{
    > > + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    > > + NULL,
    > > + "rcu_sched_grace_period");
    > > + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    > > +}
    > > +
    > > +/*
    > > * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    > > */
    > > void synchronize_kernel(void)

    >
    > I suspect I don't understand any of the RCU code any more.



    >


    --
    Thanks and Regards
    gautham
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  11. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, 8 Apr 2008 13:40:48 +0530 Gautham R Shenoy wrote:

    > > > + rdp = RCU_DATA_CPU(cpu);

    > <-- here ------\
    > > > + spin_lock_irqsave(&rdp->lock, flags); |
    > > > + |
    > > > + /* |
    > > > + * We are running on this CPU irq-disabled, so no |
    > > > + * CPU can go offline until we re-enable irqs. |

    > > |
    > > but, but, but. The cpu at `cpu' could have gone offline just before we |
    > > disabled local interrupts. |

    > |
    > In that case the CPU_DEAD callback should have migrated the rcu-lists to |
    > a cpu which is online. |

    |
    But local variable rdp might be pointing at the now-offlined CPU's data? -------/
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  12. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, Apr 08, 2008 at 01:39:36AM -0700, Andrew Morton wrote:
    > On Tue, 8 Apr 2008 13:40:48 +0530 Gautham R Shenoy wrote:
    >
    > > > > + rdp = RCU_DATA_CPU(cpu);

    > > <-- here ------\
    > > > > + spin_lock_irqsave(&rdp->lock, flags); |
    > > > > + |
    > > > > + /* |
    > > > > + * We are running on this CPU irq-disabled, so no |
    > > > > + * CPU can go offline until we re-enable irqs. |
    > > > |
    > > > but, but, but. The cpu at `cpu' could have gone offline just before we |
    > > > disabled local interrupts. |

    > > |
    > > In that case the CPU_DEAD callback should have migrated the rcu-lists to |
    > > a cpu which is online. |

    > |
    > But local variable rdp might be pointing at the now-offlined CPU's data? -------/


    Right. But then rdp wouldn't contain anything useful at this point.
    So, we may only end up taking the rdp->lock, observe that there's nothing to do,
    and move on.

    Is there something else that I am missing?

    --
    Thanks and Regards
    gautham
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  13. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, 8 Apr 2008 14:26:37 +0530 Gautham R Shenoy wrote:

    > On Tue, Apr 08, 2008 at 01:39:36AM -0700, Andrew Morton wrote:
    > > On Tue, 8 Apr 2008 13:40:48 +0530 Gautham R Shenoy wrote:
    > >
    > > > > > + rdp = RCU_DATA_CPU(cpu);
    > > > <-- here ------\
    > > > > > + spin_lock_irqsave(&rdp->lock, flags); |
    > > > > > + |
    > > > > > + /* |
    > > > > > + * We are running on this CPU irq-disabled, so no |
    > > > > > + * CPU can go offline until we re-enable irqs. |
    > > > > |
    > > > > but, but, but. The cpu at `cpu' could have gone offline just before we |
    > > > > disabled local interrupts. |
    > > > |
    > > > In that case the CPU_DEAD callback should have migrated the rcu-lists to |
    > > > a cpu which is online. |

    > > |
    > > But local variable rdp might be pointing at the now-offlined CPU's data? -------/

    >
    > Right. But then rdp wouldn't contain anything useful at this point.
    > So, we may only end up taking the rdp->lock, observe that there's nothing to do,
    > and move on.
    >
    > Is there something else that I am missing?


    erm, I guess that'll work OK.

    There were intentions to release the per-cpu memory during unplug, but
    nobody has threatened to do that for a while.

    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  14. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, Apr 08, 2008 at 12:34:49AM -0700, Andrew Morton wrote:
    > On Sun, 6 Apr 2008 14:37:19 -0700 "Paul E. McKenney" wrote:
    >
    > > Hello!
    > >
    > > Third cut of patch to provide the call_rcu_sched(). This is again to
    > > synchronize_sched() as call_rcu() is to synchronize_rcu().
    > >
    > > Should be fine for experimental use, but not ready for inclusion.

    >
    > Let me know when to come out of hiding


    ;-)

    > > Passes multi-hour rcutorture sessions with concurrent CPU hotplugging.
    > >
    > > Fixes since the first version include a bug that could result in
    > > indefinite blocking (spotted by Gautham Shenoy), better resiliency
    > > against CPU-hotplug operations, and other minor fixes.
    > >
    > > Fixes since the second version include reworking grace-period detection
    > > to avoid deadlocks that could happen when running concurrently with
    > > CPU hotplug, adding Mathieu's fix to avoid the softlockup messages,
    > > as well as Mathieu's fix to allow use earlier in boot.
    > >
    > > Known/suspected shortcomings:
    > >
    > > o Only moderately tested on x86-64 and POWER -- a few hours of
    > > rcutorture with concurrent CPU hotplugging. In particular, I
    > > still do not trust the sleep/wakeup logic between call_rcu_sched()
    > > and rcu_sched_grace_period().
    > >
    > > o Need to add call_rcu_sched() testing to rcutorture.
    > >
    > > o Still needs rcu_barrier_sched() -- intending to incorporate
    > > the version Mathieu provided.
    > >
    > > This patch also fixes a long-standing bug in the earlier preemptable-RCU
    > > implementation of synchronize_rcu() that could result in loss of
    > > concurrent external changes to a task's CPU affinity mask. I still cannot
    > > remember who reported this...
    > >
    > > ...
    > >
    > > +#define call_rcu_sched(head, func) call_rcu(head, func)
    > > +
    > > extern void __rcu_init(void);
    > > +#define rcu_init_sched() do { } while (0)

    >
    > There are lots of creepy macros-which-probably-dont-need-to-be-macros in
    > here.
    >
    > > +
    > > +static inline int
    > > +rcu_qsctr_inc_needed_dyntick(int cpu)

    >
    > Unneeded newline.


    25-year-old habits die hard. Fixed!

    > > +{
    > > + long curr;
    > > + long snap;
    > > + struct rcu_dyntick_sched *rdssp = &per_cpu(rcu_dyntick_sched, cpu);
    > > +
    > > + curr = rdssp->dynticks;
    > > + snap = rdssp->sched_dynticks_snap;
    > > + smp_mb(); /* force ordering with cpu entering/leaving dynticks. */
    > > +
    > > + /*
    > > + * If the CPU remained in dynticks mode for the entire time
    > > + * and didn't take any interrupts, NMIs, SMIs, or whatever,
    > > + * then it cannot be in the middle of an rcu_read_lock(), so
    > > + * the next rcu_read_lock() it executes must use the new value
    > > + * of the counter. Therefore, this CPU has been in a quiescent
    > > + * state the entire time, and we don't need to wait for it.
    > > + */
    > > +
    > > + if ((curr == snap) && ((curr & 0x1) == 0))
    > > + return 0;
    > > +
    > > + /*
    > > + * If the CPU passed through or entered a dynticks idle phase with
    > > + * no active irq handlers, then, as above, this CPU has already
    > > + * passed through a quiescent state.
    > > + */
    > > +
    > > + if ((curr - snap) > 2 || (snap & 0x1) == 0)
    > > + return 0;
    > > +
    > > + /* We need this CPU to go through a quiescent state. */
    > > +
    > > + return 1;
    > > +}

    >
    > That's a pretty big inline. It only has a single callsite so the compiler
    > should inline it for us. And if it grows a second callsite, the inlining
    > is probably wrong.


    K, removed the "inline". Though it is not all that big if comments are
    removed. ;-)

    > > +static inline int
    > > +rcu_qsctr_inc_needed(int cpu)

    >
    > Unneeded newline.


    Fixed.

    > > /*
    > > * Get here when RCU is idle. Decide whether we need to
    > > * move out of idle state, and return non-zero if so.
    > > @@ -821,6 +924,13 @@ void rcu_check_callbacks(int cpu, int us
    > > unsigned long flags;
    > > struct rcu_data *rdp = RCU_DATA_CPU(cpu);
    > >
    > > + if (user ||
    > > + (idle_cpu(cpu) && !in_softirq() &&
    > > + hardirq_count() <= (1 << HARDIRQ_SHIFT))) {

    >
    > I think this test could do with a bigfatcomment explaining what it is doing.


    How about the following placed before the "if"?

    /*
    * If this CPU took its interrupt from user mode or from the
    * idle loop, and this is not a nested interrupt, then
    * this CPU has to have exited all prior preept-disable
    * sections of code. So increment the counter to note this.
    *
    * The memory barrier is needed to handle the case where
    * writes from a preempt-disable section of code gets reordered
    * into schedule() by this CPU's write buffer. So the memory
    * barrier makes sure that the rcu_qsctr_inc() is seen by other
    * CPUs to happen after any such write.
    */

    And I guess rcuclassic.c needs a similar comment in its version of
    rcu_check_callbacks(). I will add this in a separate patch in this
    set of patches.

    > > + smp_mb(); /* Guard against aggressive schedule(). */
    > > + rcu_qsctr_inc(cpu);
    > > + }
    > > +
    > > rcu_check_mb(cpu);
    > > if (rcu_ctrlblk.completed == rdp->completed)
    > > rcu_try_flip();
    > >
    > > ...
    > >
    > > +
    > > + /*
    > > + * The rcu_sched grace-period processing might have bypassed
    > > + * this CPU, given that it was not in the rcu_cpu_online_map
    > > + * when the grace-period scan started. This means that the
    > > + * grace-period task might sleep. So make sure that if this
    > > + * should happen, the first callback posted to this CPU will
    > > + * wake up the grace-period task if need be.
    > > + */
    > > +
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);

    >
    > I assume that splitting the irq-disable from the spin_lock is a little
    > latency optimisation?


    As Gautham pointed out, we cannot allow the task to be preempted between
    the time that we pick up rdp and the time that we acquire rdp->lock.

    Hmmm... Wait a minute... This should set "cpu"'s rdp->rcu_sched_sleeping
    to 1, not some random CPU's. Guess I should fix that, thank you!!!

    The new code is as follows:

    rdp = RCU_DATA_CPU(cpu);
    spin_lock_irqsave(&rdp->lock, flags);
    rdp->rcu_sched_sleeping = 1;
    spin_unlock_irqrestore(&rdp->lock, flags);

    > > + rdp->rcu_sched_sleeping = 1;
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > }
    > >
    > > #else /* #ifdef CONFIG_HOTPLUG_CPU */
    > > @@ -993,26 +1129,194 @@ void call_rcu(struct rcu_head *head, voi
    > > }
    > > EXPORT_SYMBOL_GPL(call_rcu);
    > >
    > > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
    > > +{
    > > + unsigned long flags;
    > > + struct rcu_data *rdp;
    > > + int wake_gp = 0;
    > > +
    > > + head->func = func;
    > > + head->next = NULL;
    > > + local_irq_save(flags);
    > > + rdp = RCU_DATA_ME();
    > > + spin_lock(&rdp->lock);
    > > + *rdp->nextschedtail = head;
    > > + rdp->nextschedtail = &head->next;
    > > + if (rdp->rcu_sched_sleeping) {
    > > +
    > > + /* Grace-period processing might be sleeping... */
    > > +
    > > + rdp->rcu_sched_sleeping = 0;
    > > + wake_gp = 1;
    > > + }
    > > + spin_unlock(&rdp->lock);
    > > + local_irq_restore(flags);

    >
    > spin_unlock_irqrestore() here would be consistent with the above.


    One line less code as well. I fixed call_rcu() as well.

    > > + if (wake_gp) {
    > > +
    > > + /* Wake up grace-period processing, unless someone beat us. */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);

    >
    > If wake_gp!=0 is common then we could microoptimise straight-line
    > performance here by retaining the irq-offness from above.


    If wake_gp!=0 is common, then that means that this section of code
    is rarely invoked. In which case I would guess that we should avoid
    the optimization, correct?

    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleeping)
    > > + wake_gp = 0;
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_not_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + if (wake_gp)
    > > + wake_up_interruptible(&rcu_ctrlblk.sched_wq);
    > > + }
    > > +}
    > > +EXPORT_SYMBOL_GPL(call_rcu_sched);
    > >
    > > ...
    > >
    > > +static int
    > > +rcu_sched_grace_period(void *arg)

    >
    > Unneeded newline.


    Fixed.

    > > {
    > > - cpumask_t oldmask;
    > > + int couldsleep; /* might sleep after current pass. */
    > > + int couldsleepnext = 0; /* might sleep after next pass. */
    > > int cpu;
    > > + unsigned long flags;
    > > + struct rcu_data *rdp;
    > > + int ret;
    > >
    > > - if (sched_getaffinity(0, &oldmask) < 0)
    > > - oldmask = cpu_possible_map;
    > > - for_each_online_cpu(cpu) {
    > > - sched_setaffinity(0, cpumask_of_cpu(cpu));
    > > - schedule();
    > > - }
    > > - sched_setaffinity(0, oldmask);
    > > + /*
    > > + * Each pass through the following loop handles one
    > > + * rcu_sched grace period cycle.
    > > + */
    > > +
    > > + do {
    > > +
    > > + /* Save each CPU's current state. */
    > > +
    > > + for_each_online_cpu(cpu) {

    >
    > Numerous unneeded newline


    OK. I am guessing the one after the "do". The others are the ones
    following each comment block?

    > > + dyntick_save_progress_counter_sched(cpu);
    > > + save_qsctr_sched(cpu);
    > > + }
    > > +
    > > + /*
    > > + * Sleep for about an RCU grace-period's worth to
    > > + * allow better batching and to consume less CPU.
    > > + */
    > > +
    > > + schedule_timeout_interruptible(HZ / 20);

    >
    > eek, a magic number.


    Added a cpp macro RCU_SCHED_BATCH_TIME defined to be HZ / 20. ;-)

    > > + /*
    > > + * If there was nothing to do last time, prepare to
    > > + * sleep at the end of the current grace period cycle.
    > > + */
    > > +
    > > + couldsleep = couldsleepnext;
    > > + couldsleepnext = 1;
    > > + if (couldsleep) {
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleep_prep;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + }

    >
    > If the above locking actually correct and needed? The write to
    > rcu_ctrlblk.sched_sleep is a single word...


    Yes, but the code sequence in call_rcu_sched() that is protected by
    the same lock would be shocked and disappointed if that write showed
    up in the middle of its critical section. ;-)

    > > + /*
    > > + * Wait on each CPU in turn to have either visited
    > > + * a quiescent state or been in dynticks-idle mode.
    > > + */
    > > +
    > > + for_each_online_cpu(cpu) {
    > > + while (rcu_qsctr_inc_needed(cpu) &&
    > > + rcu_qsctr_inc_needed_dyntick(cpu)) {
    > > + /* resched_cpu(cpu); */
    > > + schedule_timeout_interruptible(1);
    > > + }
    > > + }
    > > +
    > > + /*
    > > + * Advance callbacks for each CPU.
    > > + */
    > > +
    > > + for_each_online_cpu(cpu) {

    >
    > It's more conventional to omit the blank line after the above form of
    > comment block.


    Also no point in its being a multi-line comment block. Changed to
    single-line comment block. And deleted the blank lines following
    each multi-line comment block. More 25-year-old habits...

    > > + rdp = RCU_DATA_CPU(cpu);
    > > + spin_lock_irqsave(&rdp->lock, flags);
    > > +
    > > + /*
    > > + * We are running on this CPU irq-disabled, so no
    > > + * CPU can go offline until we re-enable irqs.

    >
    > but, but, but. The cpu at `cpu' could have gone offline just before we
    > disabled local interrupts.


    Indeed. I added the following to the comment:

    * The current CPU might have already gone
    * offline (between the for_each_offline_cpu and
    * the spin_lock_irqsave), but in that case all its
    * callback lists will be empty, so no harm done.

    Good catch!!!

    > > + * Advance the callbacks! We share normal RCU's
    > > + * donelist, since callbacks are invoked the
    > > + * same way in either case.
    > > + */
    > > +
    > > + if (rdp->waitschedlist != NULL) {
    > > + *rdp->donetail = rdp->waitschedlist;
    > > + rdp->donetail = rdp->waitschedtail;
    > > +
    > > + /*
    > > + * Next rcu_check_callbacks() will
    > > + * do the required raise_softirq().
    > > + */
    > > + }
    > > + if (rdp->nextschedlist != NULL) {
    > > + rdp->waitschedlist = rdp->nextschedlist;
    > > + rdp->waitschedtail = rdp->nextschedtail;
    > > + couldsleep = 0;
    > > + couldsleepnext = 0;
    > > + } else {
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + }
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > +
    > > + /* Mark sleep intention. */
    > > +
    > > + rdp->rcu_sched_sleeping = couldsleep;
    > > +
    > > + spin_unlock_irqrestore(&rdp->lock, flags);
    > > + }
    > > +
    > > + /* If we saw callbacks on the last scan, go deal with them. */
    > > +
    > > + if (!couldsleep)
    > > + continue;
    > > +
    > > + /* Attempt to block... */
    > > +
    > > + spin_lock_irqsave(&rcu_ctrlblk.schedlock, flags);
    > > + if (rcu_ctrlblk.sched_sleep != rcu_sched_sleep_prep) {
    > > +
    > > + /*
    > > + * Someone posted a callback after we scanned.
    > > + * Go take care of it.
    > > + */
    > > +
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + couldsleepnext = 0;
    > > + continue;
    > > + }
    > > +
    > > + /* Block until the next person posts a callback. */
    > > +
    > > + rcu_ctrlblk.sched_sleep = rcu_sched_sleeping;
    > > + spin_unlock_irqrestore(&rcu_ctrlblk.schedlock, flags);
    > > + ret = 0;
    > > + __wait_event_interruptible(rcu_ctrlblk.sched_wq,
    > > + rcu_ctrlblk.sched_sleep != rcu_sched_sleeping,
    > > + ret);
    > > + if (ret)
    > > + flush_signals(current);

    >
    > That flush_signals() was a surprise. A desurprising comment would be nice.


    I added the following comment:

    /*
    * Signals would prevent us from sleeping, and we cannot
    * do much with them in any case. So flush them.
    */

    Seem reasonable?

    > > + couldsleepnext = 0;
    > > +
    > > + } while (!kthread_should_stop());
    > > +
    > > + return (0);
    > > }
    > > -EXPORT_SYMBOL_GPL(__synchronize_sched);
    > >
    > > /*
    > > * Check to see if any future RCU-related work will need to be done
    > > @@ -1029,7 +1333,9 @@ int rcu_needs_cpu(int cpu)
    > >
    > > return (rdp->donelist != NULL ||
    > > !!rdp->waitlistcount ||
    > > - rdp->nextlist != NULL);
    > > + rdp->nextlist != NULL ||
    > > + rdp->nextschedlist != NULL ||
    > > + rdp->waitschedlist != NULL);
    > > }
    > >
    > > int rcu_pending(int cpu)
    > > @@ -1040,7 +1346,9 @@ int rcu_pending(int cpu)
    > >
    > > if (rdp->donelist != NULL ||
    > > !!rdp->waitlistcount ||
    > > - rdp->nextlist != NULL)
    > > + rdp->nextlist != NULL ||
    > > + rdp->nextschedlist != NULL ||
    > > + rdp->waitschedlist != NULL)
    > > return 1;
    > >
    > > /* The RCU core needs an acknowledgement from this CPU. */
    > > @@ -1107,6 +1415,11 @@ void __init __rcu_init(void)
    > > rdp->donetail = &rdp->donelist;
    > > rdp->rcu_flipctr[0] = 0;
    > > rdp->rcu_flipctr[1] = 0;
    > > + rdp->nextschedlist = NULL;
    > > + rdp->nextschedtail = &rdp->nextschedlist;
    > > + rdp->waitschedlist = NULL;
    > > + rdp->waitschedtail = &rdp->waitschedlist;
    > > + rdp->rcu_sched_sleeping = 0;
    > > }
    > > register_cpu_notifier(&rcu_nb);
    > >
    > > @@ -1129,6 +1442,18 @@ void __init __rcu_init(void)
    > > }
    > >
    > > /*
    > > + * Late-boot-time RCU initialization that must wait until after scheduler
    > > + * has been initialized.
    > > + */
    > > +void __init rcu_init_sched(void)
    > > +{
    > > + rcu_sched_grace_period_task = kthread_run(rcu_sched_grace_period,
    > > + NULL,
    > > + "rcu_sched_grace_period");
    > > + WARN_ON(IS_ERR(rcu_sched_grace_period_task));
    > > +}
    > > +
    > > +/*
    > > * Deprecated, use synchronize_rcu() or synchronize_sched() instead.
    > > */
    > > void synchronize_kernel(void)

    >
    > I suspect I don't understand any of the RCU code any more.


    This definition for synchronize_kernel() somehow slipped back in.
    I have removed it. Sorry for the confusion!!!

    Thanx, Paul
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

  15. Re: [PATCH,RFC] Add call_rcu_sched()

    On Tue, Apr 08, 2008 at 02:07:58AM -0700, Andrew Morton wrote:
    > On Tue, 8 Apr 2008 14:26:37 +0530 Gautham R Shenoy wrote:
    >
    > > On Tue, Apr 08, 2008 at 01:39:36AM -0700, Andrew Morton wrote:
    > > > On Tue, 8 Apr 2008 13:40:48 +0530 Gautham R Shenoy wrote:
    > > >
    > > > > > > + rdp = RCU_DATA_CPU(cpu);
    > > > > <-- here ------\
    > > > > > > + spin_lock_irqsave(&rdp->lock, flags); |
    > > > > > > + |
    > > > > > > + /* |
    > > > > > > + * We are running on this CPU irq-disabled, so no |
    > > > > > > + * CPU can go offline until we re-enable irqs. |
    > > > > > |
    > > > > > but, but, but. The cpu at `cpu' could have gone offline just before we |
    > > > > > disabled local interrupts. |
    > > > > |
    > > > > In that case the CPU_DEAD callback should have migrated the rcu-lists to |
    > > > > a cpu which is online. |
    > > > |
    > > > But local variable rdp might be pointing at the now-offlined CPU's data? -------/

    > >
    > > Right. But then rdp wouldn't contain anything useful at this point.
    > > So, we may only end up taking the rdp->lock, observe that there's nothing to do,
    > > and move on.
    > >
    > > Is there something else that I am missing?

    >
    > erm, I guess that'll work OK.
    >
    > There were intentions to release the per-cpu memory during unplug, but
    > nobody has threatened to do that for a while.


    That would be a bit painful in a number of situations... :-/

    Thanx, Paul
    --
    To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
    the body of a message to majordomo@vger.kernel.org
    More majordomo info at http://vger.kernel.org/majordomo-info.html
    Please read the FAQ at http://www.tux.org/lkml/

+ Reply to Thread