Ensure that that one goroutine exits when the iterator is closed

This resolves that completely nonsensical memory leak situation. As far
as we can understand, the cause was a hodgepodge of the following:

- There is some buffer sharing going on deep in pgx
- Queries made with a cancellable but long-running context (like that
used for background jobs) would leave iterator-related goroutines
hanging
- These goroutines had a pgx `rows` object in their closures, preventing
the row stuff from being garbage collected
- If you look at a profile, it all appears to be caused by whatever
functions were doing the most database queries / reading the most from
Postgres. In fact those things were _allocating_ the most but not
retaining any of that data - it was being retained by these other
goroutines because of magic buffer sharing huzzah I love it

We could have solved this in approximately 30 minutes if Go could
actually tell us what is keeping things alive in the heap, instead of
just tracking allocations.
This commit is contained in:
Ben Visness 2021-10-21 01:42:34 -05:00
parent 34a318c902
commit 623aaec9d8
1 changed files with 12 additions and 2 deletions

View File

@ -93,11 +93,13 @@ type StructQueryIterator struct {
fieldPaths [][]int fieldPaths [][]int
rows pgx.Rows rows pgx.Rows
destType reflect.Type destType reflect.Type
closed chan struct{}
} }
func (it *StructQueryIterator) Next() (interface{}, bool) { func (it *StructQueryIterator) Next() (interface{}, bool) {
hasNext := it.rows.Next() hasNext := it.rows.Next()
if !hasNext { if !hasNext {
it.Close()
return nil, false return nil, false
} }
@ -169,6 +171,10 @@ func (it *StructQueryIterator) Next() (interface{}, bool) {
func (it *StructQueryIterator) Close() { func (it *StructQueryIterator) Close() {
it.rows.Close() it.rows.Close()
select {
case it.closed <- struct{}{}:
default:
}
} }
func (it *StructQueryIterator) ToSlice() []interface{} { func (it *StructQueryIterator) ToSlice() []interface{} {
@ -241,6 +247,7 @@ func Query(ctx context.Context, conn ConnOrTx, destExample interface{}, query st
fieldPaths: fieldPaths, fieldPaths: fieldPaths,
rows: rows, rows: rows,
destType: destType, destType: destType,
closed: make(chan struct{}, 1),
} }
// Ensure that iterators are closed if context is cancelled. Otherwise, iterators can hold // Ensure that iterators are closed if context is cancelled. Otherwise, iterators can hold
@ -250,8 +257,11 @@ func Query(ctx context.Context, conn ConnOrTx, destExample interface{}, query st
if done == nil { if done == nil {
return return
} }
<-done select {
case <-done:
it.Close() it.Close()
case <-it.closed:
}
}() }()
return it, nil return it, nil