Jida's Blog

Concurrency in Go (4): singleflight

1st January 2025
Golang
Concurrency
Cache Optimization
singleflight
Last updated:25th January 2025
9 Minutes
1655 Words

Intro

When building a backend server with a database, adding a caching layer, such as Redis, is a common practice to prevent excessive database queries. However, in the case of cache miss storm where numerous requests simultaneously query the same expired key, the resulting cache miss can overwhelm the database with a surge of queries, potentially leading to performance degradation or timeouts.

default

To address this issue, singleflight package can be used to combine multiple identical requests into one within a short period, and thus, the database load can be reduced from N requests to just 1, improving the system’s robustness. It allows only one “in-flight” (on-going) operation for the same “key” at any given time.

default

How to use

1. Do

concepts

  • Syntax:

    1
    func (*g* *Group) Do(*key* string, *fn* func() (interface{}, error))
    2
    (*v* interface{}, *err* error, *shared* bool)
  • Behavior: execute and return the results of fn, making sure only one execution is in-flight for a given key at a time. If a duplicate comes in, the duplicate caller waits for the first to complete and receives the same results.

  • Returns:

    • v interface{}, err error: returned values of fn
    • shared bool: indicates whether v was given to multiple callers

example

The example below simulates a simple case where 5 goroutines try to fetch the same data, spaced 40 ms apart. Thanks to singleflight.group, we ensure only the first goroutine runs fetchData(), reducing the overhead.

1
var callCount atomic.Int32
2
var wg sync.WaitGroup
3
4
// Simulate a function that fetches data from a database
5
func fetchData() (interface{}, error) {
6
callCount.Add(1)
7
time.Sleep(100 * time.Millisecond)
8
return rand.Intn(100), nil
9
}
10
11
// Wrap the fetchData function with singleflight
12
func fetchDataWrapper(g *singleflight.Group, id int) error {
13
defer wg.Done()
14
15
time.Sleep(time.Duration(id) * 40 * time.Millisecond)
24 collapsed lines
16
// Assign a unique key to track these requests
17
v, err, shared := g.Do("key-fetch-data", fetchData)
18
if err != nil {
19
return err
20
}
21
22
fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
23
return nil
24
}
25
26
func main() {
27
var g singleflight.Group
28
29
// 5 goroutines to fetch the same data
30
const numGoroutines = 5
31
wg.Add(numGoroutines)
32
33
for i := 0; i < numGoroutines; i++ {
34
go fetchDataWrapper(&g, i)
35
}
36
37
wg.Wait()
38
fmt.Printf("Function was called %d times\n", callCount.Load())
39
}

The diagram below shows the singleflight in action.

From time 0ms to time 100ms, while G0 is fetching the data, G1 and G2 ask for the same key. So they wait for the result rather than starting a new call. Once G0 finishes its call, any waiting goroutines get the same result.

Therefore, even though 5 goroutines are fetching data, fetchData only ran twice.

default

Here, the results prove the idea above.

Meanwhile, you may wonder whether shared for G0 is true, as it is the first goroutine. The reason is that shared in g.Do tells whether the result was shared among multiple callers, meaning whether the result is used by more than one caller. It tells the result is reused across multiple goroutines.

Terminal window
1
go run main.go
2
Goroutine 0: result: 55, shared: true
3
Goroutine 1: result: 55, shared: true
4
Goroutine 2: result: 55, shared: true
5
Goroutine 3: result: 24, shared: true
6
Goroutine 4: result: 24, shared: true
7
Function was called 2 times

2. DoChan

concepts

  • Syntax:

    1
    func (*g* *Group) DoChan(*key* string, *fn* func() (interface{}, error))
    2
    <-chan Result
  • Behavior: similar to Do, it returns a channel that will receive the results when they are ready, instead of blocking. The returned channel would not be closed.

  • Use case: useful if prefer handling the result asynchronously or if you’re selecting over multiple channels.

example

In the real scenario, fetching the data from the database could take a unexpected long time. Using DoChan allows us to have a timeout control to help to prevent unexpectedly long blocking.

1
func goChanExample() {
2
fetchData := func() (interface{}, error) {
3
// infinite blocking
4
select{}
5
return rand.Intn(100), nil
6
}
7
8
wg := sync.WaitGroup{}
9
g := singleflight.Group{}
10
11
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
12
defer cancel()
13
14
ch := g.DoChan("key-fetch-data", fetchData)
15
select {
16 collapsed lines
16
case res := <-ch:
17
if res.Err != nil {
18
fmt.Printf("error: %v\n", res.Err)
19
return
20
}
21
fmt.Printf("result: %v, shared: %v\n", res.Val, res.Shared)
22
case <-ctx.Done():
23
fmt.Println("timeout")
24
return
25
}
26
27
wg.Wait()
28
}
29
30
// output:
31
// timeout

3. Forget

concepts

  • Syntax:

    1
    func (g *Group) Forget(key string)
  • Behavior: tell the singleflight to forget about a key. Future calls to Do for this key will call the function rather than waiting for an earlier call to complete.

example

The Group.Forget method allows you to explicitly remove a key from the group’s internal mapping, so subsequent calls for the same key will not wait for the previous function to return.

1
func forgetExample() {
2
// Function to simulate a long-running process
3
fetchData := func(key string) (interface{}, error) {
4
fmt.Printf("Fetching data for key: %s\n", key)
5
time.Sleep(2 * time.Second) // Simulate a delay
6
return fmt.Sprintf("Result for %s", key), nil
7
}
8
9
var g singleflight.Group
10
11
var wg sync.WaitGroup
12
wg.Add(3)
13
14
// Goroutine 1: Call fetchData with key "key1"
15
go func() {
35 collapsed lines
16
result, err, shared := g.Do("key1", func() (interface{}, error) {
17
return fetchData("key1")
18
})
19
if err != nil {
20
fmt.Println("Error:", err)
21
} else {
22
fmt.Printf("G1 received: %v (shared: %v)\n", result, shared)
23
}
24
wg.Done()
25
}()
26
27
// Goroutine 2: Forget the key before the result is available
28
go func() {
29
time.Sleep(1 * time.Second) // Wait a bit to simulate concurrency
30
g.Forget("key1")
31
fmt.Println("G2 called Forget for key1")
32
wg.Done()
33
}()
34
35
// Goroutine 3: Call fetchData with key "key1" after Forget
36
go func() {
37
time.Sleep(1 * time.Second) // Wait to ensure Forget is called first
38
result, err, shared := g.Do("key1", func() (interface{}, error) {
39
return fetchData("key1")
40
})
41
if err != nil {
42
fmt.Println("Error:", err)
43
} else {
44
fmt.Printf("G3 received: %v (shared: %v)\n", result, shared)
45
}
46
wg.Done()
47
}()
48
49
wg.Wait()
50
}
Terminal window
1
go run main.go
2
Fetching data for key: key1 # fetched by G1
3
G2 called Forget for key1
4
Fetching data for key: key1 # fetched by G3
5
G1 received: Result for key1 (shared: false)
6
G3 received: Result for key1 (shared: false)

Example of cache miss

  • The following is the benchmark test results of diagrams in the Intro section. You may view and run the code by cloning the repo: Cache Miss Example.
Terminal window
1
BenchmarkQueryUserSingleflight_1000-12
2
DB query was called 1 times
3
...
4
1000000000 0.1379 ns/op
5
6
BenchmarkQueryUser_1000-12
7
DB query was called 1000 times
8
...
9
1000000000 0.1397 ns/op
10
11
BenchmarkQueryUserSingleflight_10000-12
12
DB query was called 2 times
13
...
14
1000000000 0.5933 ns/op
15
4 collapsed lines
16
BenchmarkQueryUser_10000-12
17
DB query was called 10000 times
18
...
19
1000000000 0.6913 ns/op

How it works

Let’s first take a look at the structs in singleflight.

1. Group & call

1
type Group struct {
2
mu sync.Mutex // protects the map m
3
m map[string]*call // maps keys to calls; lazily initialized
4
}
5
6
type call struct {
7
wg sync.WaitGroup // waits for the function execution
8
val interface{} // result of the function call
9
err error // error from the function call
10
dups int // number of duplicate callers
11
chans []chan<- Result // channels to receive the result
12
}
  • Group mutex mu: Project the entire map of keys, not one lock per key → ensure updating keys is thread-safe.
  • WaitGroup wg: Used to wait for the first goroutine associated with a specific key to finish its work.

2. g.Do()

Source code:

1
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
2
// lock the call map
3
g.mu.Lock()
4
5
// if the map is not initialized, initialize it
6
if g.m == nil {
7
g.m = make(map[string]*call)
8
}
9
10
// if the call is found in the map
11
if c, ok := g.m[key]; ok {
12
// increment a counter to track dup requests
13
c.dups++
14
// releases the lock
15
g.mu.Unlock()
22 collapsed lines
16
// wait for the original task to complete by calling
17
c.wg.Wait()
18
19
if e, ok := c.err.(*panicError); ok {
20
panic(e)
21
} else if c.err == errGoexit {
22
runtime.Goexit()
23
}
24
return c.val, c.err, true
25
}
26
// if no other goroutine is doing the request
27
// create a new call object
28
c := new(call)
29
c.wg.Add(1)
30
// add it to the call map
31
g.m[key] = c
32
g.mu.Unlock()
33
34
// execute the function
35
g.doCall(c, key, fn)
36
return c.val, c.err, c.dups > 0
37
}

2-1. runtime.Goexit()

concepts

runtime.Goexit() is used to stop the execution of a goroutine. When a goroutine calls Goexit(), it stops, and any deferred functions are still run in Last-In-First-Out (LIFO) order, just like normal.

Be careful as it doesn’t trigger a panic. And only the goroutine that calls Goexit() gets terminated and all the other goroutines keep running just fine.

experiments

In this code snippet, Goexit() terminates the main goroutine. But if there are other goroutines still running, the program keeps going because the Go runtime stays alive as long as at least one goroutine is active.

1
func firstTry() {
2
go func() {
3
fmt.Println("sub goroutine called")
4
}()
5
6
runtime.Goexit()
7
fmt.Println("main goroutine called")
8
}
Terminal window
1
go run goexit.go
2
sub goroutine called
3
fatal error: no goroutines (main called runtime.Goexit) - deadlock!
4
exit status 2

2-2. doCall()

doCall() handles the single call for a key. It uses two defers to panic from runtime.Goexit.

When runtime.Goexit() is called, the entire goroutine is terminated, like panic(). However, if a panic() is recovered, only the chain of functions between the panic() and the recover() is terminated, not the entire goroutine.

1
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
2
normalReturn := false
3
recovered := false
4
5
// use double-defer to distinguish panic from runtime.Goexit,
6
defer func() {
7
// the given function invoked runtime.Goexit
8
if !normalReturn && !recovered {
9
c.err = errGoexit
10
}
11
... // handle each case
12
}()
13
14
func() {
15
defer func() {
15 collapsed lines
16
if !normalReturn {
17
if r := recover(); r != nil {
18
c.err = newPanicError(r)
19
}
20
}
21
}()
22
23
c.val, c.err = fn()
24
normalReturn = true
25
}()
26
27
if !normalReturn {
28
recovered = true
29
}
30
}

Summary

  1. Use Do() for the sync case, while use DoChan() for the async case.
  2. If one goroutine is blocked for fetching Resource A, any other goroutines fetching Resource A would also be blocked.
  3. Use DoChan for a more customized control, like timeout.
  4. Once the calling goroutine receives the value or the error, all the other waiting goroutines would also receive the value or the error.
  5. The Group.Forget method allows you to explicitly remove a key from the group’s internal mapping, so subsequent calls for the same key will not wait for the previous function to return.

To view the code in the example, you may go to: https://github.com/jidalii/go-playground/tree/main/goSingleflight.

References

Article title:Concurrency in Go (4): singleflight
Article author:Jida-Li
Release time:1st January 2025