// Copyright 2014 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package runtime import ( "runtime/internal/atomic" "unsafe" ) // Functions temporarily called by C code. //go:linkname newextram runtime.newextram //go:linkname acquirep runtime.acquirep //go:linkname releasep runtime.releasep //go:linkname incidlelocked runtime.incidlelocked //go:linkname checkdead runtime.checkdead //go:linkname sysmon runtime.sysmon //go:linkname schedtrace runtime.schedtrace //go:linkname allgadd runtime.allgadd //go:linkname mcommoninit runtime.mcommoninit //go:linkname ready runtime.ready //go:linkname gcprocs runtime.gcprocs //go:linkname needaddgcproc runtime.needaddgcproc //go:linkname stopm runtime.stopm //go:linkname handoffp runtime.handoffp //go:linkname wakep runtime.wakep //go:linkname stoplockedm runtime.stoplockedm //go:linkname schedule runtime.schedule //go:linkname execute runtime.execute //go:linkname gfput runtime.gfput //go:linkname gfget runtime.gfget //go:linkname lockOSThread runtime.lockOSThread //go:linkname unlockOSThread runtime.unlockOSThread //go:linkname procresize runtime.procresize //go:linkname helpgc runtime.helpgc //go:linkname stopTheWorldWithSema runtime.stopTheWorldWithSema //go:linkname startTheWorldWithSema runtime.startTheWorldWithSema //go:linkname mput runtime.mput //go:linkname mget runtime.mget //go:linkname globrunqput runtime.globrunqput //go:linkname pidleget runtime.pidleget //go:linkname runqempty runtime.runqempty //go:linkname runqput runtime.runqput // Function called by misc/cgo/test. //go:linkname lockedOSThread runtime.lockedOSThread // Functions temporarily in C that have not yet been ported. func allocm(*p, bool, *unsafe.Pointer, *uintptr) *m func malg(bool, bool, *unsafe.Pointer, *uintptr) *g func startm(*p, bool) func newm(unsafe.Pointer, *p) func gchelper() func getfingwait() bool func getfingwake() bool func wakefing() *g // C functions for ucontext management. func gogo(*g) func setGContext() func makeGContext(*g, unsafe.Pointer, uintptr) func getTraceback(me, gp *g) // main_init_done is a signal used by cgocallbackg that initialization // has been completed. It is made before _cgo_notify_runtime_init_done, // so all cgo calls can rely on it existing. When main_init is complete, // it is closed, meaning cgocallbackg can reliably receive from it. var main_init_done chan bool func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } //go:nosplit func acquireSudog() *sudog { // Delicate dance: the semaphore implementation calls // acquireSudog, acquireSudog calls new(sudog), // new calls malloc, malloc can call the garbage collector, // and the garbage collector calls the semaphore implementation // in stopTheWorld. // Break the cycle by doing acquirem/releasem around new(sudog). // The acquirem/releasem increments m.locks during new(sudog), // which keeps the garbage collector from being invoked. mp := acquirem() pp := mp.p.ptr() if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // First, try to grab a batch from central cache. for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // If the central cache is empty, allocate a new one. if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s } //go:nosplit func releaseSudog(s *sudog) { if s.elem != nil { throw("runtime: sudog with non-nil elem") } if s.selectdone != nil { throw("runtime: sudog with non-nil selectdone") } if s.next != nil { throw("runtime: sudog with non-nil next") } if s.prev != nil { throw("runtime: sudog with non-nil prev") } if s.waitlink != nil { throw("runtime: sudog with non-nil waitlink") } if s.c != nil { throw("runtime: sudog with non-nil c") } gp := getg() if gp.param != nil { throw("runtime: releaseSudog with non-nil gp.param") } mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() if len(pp.sudogcache) == cap(pp.sudogcache) { // Transfer half of local cache to the central cache. var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } pp.sudogcache = append(pp.sudogcache, s) releasem(mp) } // funcPC returns the entry PC of the function f. // It assumes that f is a func value. Otherwise the behavior is undefined. // For gccgo here unless and until we port proc.go. // Note that this differs from the gc implementation; the gc implementation // adds sys.PtrSize to the address of the interface value, but GCC's // alias analysis decides that that can not be a reference to the second // field of the interface, and in some cases it drops the initialization // of the second field as a dead store. //go:nosplit func funcPC(f interface{}) uintptr { i := (*iface)(unsafe.Pointer(&f)) return **(**uintptr)(i.data) } func lockedOSThread() bool { gp := getg() return gp.lockedm != nil && gp.m.lockedg != nil } var ( allgs []*g allglock mutex ) func allgadd(gp *g) { if readgstatus(gp) == _Gidle { throw("allgadd: bad status Gidle") } lock(&allglock) allgs = append(allgs, gp) allglen = uintptr(len(allgs)) // Grow GC rescan list if necessary. if len(allgs) > cap(work.rescan.list) { lock(&work.rescan.lock) l := work.rescan.list // Let append do the heavy lifting, but keep the // length the same. work.rescan.list = append(l[:cap(l)], 0)[:len(l)] unlock(&work.rescan.lock) } unlock(&allglock) } func dumpgstatus(gp *g) { _g_ := getg() print("runtime: gp: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", readgstatus(gp), "\n") print("runtime: g: g=", _g_, ", goid=", _g_.goid, ", g->atomicstatus=", readgstatus(_g_), "\n") } func checkmcount() { // sched lock is held if sched.mcount > sched.maxmcount { print("runtime: program exceeds ", sched.maxmcount, "-thread limit\n") throw("thread exhaustion") } } func mcommoninit(mp *m) { _g_ := getg() // g0 stack won't make sense for user (and is not necessary unwindable). if _g_ != _g_.m.g0 { callers(1, mp.createstack[:]) } mp.fastrand = 0x49f6428a + uint32(mp.id) + uint32(cputicks()) if mp.fastrand == 0 { mp.fastrand = 0x49f6428a } lock(&sched.lock) mp.id = sched.mcount sched.mcount++ checkmcount() mpreinit(mp) // Add to allm so garbage collector doesn't free g->m // when it is just in a register or thread-local storage. mp.alllink = allm // NumCgoCall() iterates over allm w/o schedlock, // so we need to publish it safely. atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) } // Mark gp ready to run. func ready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) } status := readgstatus(gp) // Mark runnable. _g_ := getg() _g_.m.locks++ // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { wakep() } _g_.m.locks-- } func gcprocs() int32 { // Figure out how many CPUs to use during GC. // Limited by gomaxprocs, number of actual CPUs, and MaxGcproc. lock(&sched.lock) n := gomaxprocs if n > ncpu { n = ncpu } if n > _MaxGcproc { n = _MaxGcproc } if n > sched.nmidle+1 { // one M is currently running n = sched.nmidle + 1 } unlock(&sched.lock) return n } func needaddgcproc() bool { lock(&sched.lock) n := gomaxprocs if n > ncpu { n = ncpu } if n > _MaxGcproc { n = _MaxGcproc } n -= sched.nmidle + 1 // one M is currently running unlock(&sched.lock) return n > 0 } func helpgc(nproc int32) { _g_ := getg() lock(&sched.lock) pos := 0 for n := int32(1); n < nproc; n++ { // one M is currently running if allp[pos].mcache == _g_.m.mcache { pos++ } mp := mget() if mp == nil { throw("gcprocs inconsistency") } mp.helpgc = n mp.p.set(allp[pos]) mp.mcache = allp[pos].mcache pos++ notewakeup(&mp.park) } unlock(&sched.lock) } // freezeStopWait is a large value that freezetheworld sets // sched.stopwait to in order to request that all Gs permanently stop. const freezeStopWait = 0x7fffffff // freezing is set to non-zero if the runtime is trying to freeze the // world. var freezing uint32 // Similar to stopTheWorld but best-effort and can be called several times. // There is no reverse operation, used during crashing. // This function must not lock any mutexes. func freezetheworld() { atomic.Store(&freezing, 1) // stopwait and preemption requests can be lost // due to races with concurrently executing threads, // so try several times for i := 0; i < 5; i++ { // this should tell the scheduler to not start any new goroutines sched.stopwait = freezeStopWait atomic.Store(&sched.gcwaiting, 1) // this should stop running goroutines if !preemptall() { break // no running goroutines } usleep(1000) } // to be sure usleep(1000) preemptall() usleep(1000) } func isscanstatus(status uint32) bool { if status == _Gscan { throw("isscanstatus: Bad status Gscan") } return status&_Gscan == _Gscan } // All reads and writes of g's status go through readgstatus, casgstatus // castogscanstatus, casfrom_Gscanstatus. //go:nosplit func readgstatus(gp *g) uint32 { return atomic.Load(&gp.atomicstatus) } // Ownership of gcscanvalid: // // If gp is running (meaning status == _Grunning or _Grunning|_Gscan), // then gp owns gp.gcscanvalid, and other goroutines must not modify it. // // Otherwise, a second goroutine can lock the scan state by setting _Gscan // in the status bit and then modify gcscanvalid, and then unlock the scan state. // // Note that the first condition implies an exception to the second: // if a second goroutine changes gp's status to _Grunning|_Gscan, // that second goroutine still does not have the right to modify gcscanvalid. // The Gscanstatuses are acting like locks and this releases them. // If it proves to be a performance hit we should be able to make these // simple atomic stores but for now we are going to throw if // we see an inconsistent state. func casfrom_Gscanstatus(gp *g, oldval, newval uint32) { success := false // Check that transition is valid. switch oldval { default: print("runtime: casfrom_Gscanstatus bad oldval gp=", gp, ", oldval=", hex(oldval), ", newval=", hex(newval), "\n") dumpgstatus(gp) throw("casfrom_Gscanstatus:top gp->status is not in scan state") case _Gscanrunnable, _Gscanwaiting, _Gscanrunning, _Gscansyscall: if newval == oldval&^_Gscan { success = atomic.Cas(&gp.atomicstatus, oldval, newval) } } if !success { print("runtime: casfrom_Gscanstatus failed gp=", gp, ", oldval=", hex(oldval), ", newval=", hex(newval), "\n") dumpgstatus(gp) throw("casfrom_Gscanstatus: gp->status is not in scan state") } } // This will return false if the gp is not in the expected status and the cas fails. // This acts like a lock acquire while the casfromgstatus acts like a lock release. func castogscanstatus(gp *g, oldval, newval uint32) bool { switch oldval { case _Grunnable, _Grunning, _Gwaiting, _Gsyscall: if newval == oldval|_Gscan { return atomic.Cas(&gp.atomicstatus, oldval, newval) } } print("runtime: castogscanstatus oldval=", hex(oldval), " newval=", hex(newval), "\n") throw("castogscanstatus") panic("not reached") } // If asked to move to or from a Gscanstatus this will throw. Use the castogscanstatus // and casfrom_Gscanstatus instead. // casgstatus will loop if the g->atomicstatus is in a Gscan status until the routine that // put it in the Gscan state is finished. //go:nosplit func casgstatus(gp *g, oldval, newval uint32) { if (oldval&_Gscan != 0) || (newval&_Gscan != 0) || oldval == newval { systemstack(func() { print("runtime: casgstatus: oldval=", hex(oldval), " newval=", hex(newval), "\n") throw("casgstatus: bad incoming values") }) } if oldval == _Grunning && gp.gcscanvalid { // If oldvall == _Grunning, then the actual status must be // _Grunning or _Grunning|_Gscan; either way, // we own gp.gcscanvalid, so it's safe to read. // gp.gcscanvalid must not be true when we are running. print("runtime: casgstatus ", hex(oldval), "->", hex(newval), " gp.status=", hex(gp.atomicstatus), " gp.gcscanvalid=true\n") throw("casgstatus") } // See http://golang.org/cl/21503 for justification of the yield delay. const yieldDelay = 5 * 1000 var nextYield int64 // loop if gp->atomicstatus is in a scan state giving // GC time to finish and change the state to oldval. for i := 0; !atomic.Cas(&gp.atomicstatus, oldval, newval); i++ { if oldval == _Gwaiting && gp.atomicstatus == _Grunnable { systemstack(func() { throw("casgstatus: waiting for Gwaiting but is Grunnable") }) } // Help GC if needed. // if gp.preemptscan && !gp.gcworkdone && (oldval == _Grunning || oldval == _Gsyscall) { // gp.preemptscan = false // systemstack(func() { // gcphasework(gp) // }) // } // But meanwhile just yield. if i == 0 { nextYield = nanotime() + yieldDelay } if nanotime() < nextYield { for x := 0; x < 10 && gp.atomicstatus != oldval; x++ { procyield(1) } } else { osyield() nextYield = nanotime() + yieldDelay/2 } } if newval == _Grunning && gp.gcscanvalid { // Run queueRescan on the system stack so it has more space. systemstack(func() { queueRescan(gp) }) } } // stopTheWorld stops all P's from executing goroutines, interrupting // all goroutines at GC safe points and records reason as the reason // for the stop. On return, only the current goroutine's P is running. // stopTheWorld must not be called from a system stack and the caller // must not hold worldsema. The caller must call startTheWorld when // other P's should resume execution. // // stopTheWorld is safe for multiple goroutines to call at the // same time. Each will execute its own stop, and the stops will // be serialized. // // This is also used by routines that do stack dumps. If the system is // in panic or being exited, this may not reliably stop all // goroutines. func stopTheWorld(reason string) { semacquire(&worldsema, 0) getg().m.preemptoff = reason systemstack(stopTheWorldWithSema) } // startTheWorld undoes the effects of stopTheWorld. func startTheWorld() { systemstack(startTheWorldWithSema) // worldsema must be held over startTheWorldWithSema to ensure // gomaxprocs cannot change while worldsema is held. semrelease(&worldsema) getg().m.preemptoff = "" } // Holding worldsema grants an M the right to try to stop the world // and prevents gomaxprocs from changing concurrently. var worldsema uint32 = 1 // stopTheWorldWithSema is the core implementation of stopTheWorld. // The caller is responsible for acquiring worldsema and disabling // preemption first and then should stopTheWorldWithSema on the system // stack: // // semacquire(&worldsema, 0) // m.preemptoff = "reason" // systemstack(stopTheWorldWithSema) // // When finished, the caller must either call startTheWorld or undo // these three operations separately: // // m.preemptoff = "" // systemstack(startTheWorldWithSema) // semrelease(&worldsema) // // It is allowed to acquire worldsema once and then execute multiple // startTheWorldWithSema/stopTheWorldWithSema pairs. // Other P's are able to execute between successive calls to // startTheWorldWithSema and stopTheWorldWithSema. // Holding worldsema causes any other goroutines invoking // stopTheWorld to block. func stopTheWorldWithSema() { _g_ := getg() // If we hold a lock, then we won't be able to stop another M // that is blocked trying to acquire the lock. if _g_.m.locks > 0 { throw("stopTheWorld: holding locks") } lock(&sched.lock) sched.stopwait = gomaxprocs atomic.Store(&sched.gcwaiting, 1) preemptall() // stop current P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // try to retake all P's in Psyscall status for i := 0; i < int(gomaxprocs); i++ { p := allp[i] s := p.status if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ sched.stopwait-- } } // stop idle P's for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait-- } wait := sched.stopwait > 0 unlock(&sched.lock) // wait for remaining P's to stop voluntarily if wait { for { // wait for 100us, then try to re-preempt in case of any races if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } preemptall() } } // sanity checks bad := "" if sched.stopwait != 0 { bad = "stopTheWorld: not stopped (stopwait != 0)" } else { for i := 0; i < int(gomaxprocs); i++ { p := allp[i] if p.status != _Pgcstop { bad = "stopTheWorld: not stopped (status != _Pgcstop)" } } } if atomic.Load(&freezing) != 0 { // Some other thread is panicking. This can cause the // sanity checks above to fail if the panic happens in // the signal handler on a stopped thread. Either way, // we should halt this thread. lock(&deadlock) lock(&deadlock) } if bad != "" { throw(bad) } } func mhelpgc() { _g_ := getg() _g_.m.helpgc = -1 } func startTheWorldWithSema() { _g_ := getg() _g_.m.locks++ // disable preemption because it can be holding p in a local var gp := netpoll(false) // non-blocking injectglist(gp) add := needaddgcproc() lock(&sched.lock) procs := gomaxprocs if newprocs != 0 { procs = newprocs newprocs = 0 } p1 := procresize(procs) sched.gcwaiting = 0 if sched.sysmonwait != 0 { sched.sysmonwait = 0 notewakeup(&sched.sysmonnote) } unlock(&sched.lock) for p1 != nil { p := p1 p1 = p1.link.ptr() if p.m != 0 { mp := p.m.ptr() p.m = 0 if mp.nextp != 0 { throw("startTheWorld: inconsistent mp->nextp") } mp.nextp.set(p) notewakeup(&mp.park) } else { // Start M to run P. Do not start another M below. newm(nil, p) add = false } } // Wakeup an additional proc in case we have excessive runnable goroutines // in local queues or in the global queue. If we don't, the proc will park itself. // If we have lots of excessive work, resetspinning will unpark additional procs as necessary. if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { wakep() } if add { // If GC could have used another helper proc, start one now, // in the hope that it will be available next time. // It would have been even better to start it before the collection, // but doing so requires allocating memory, so it's tricky to // coordinate. This lazy approach works out in practice: // we don't mind if the first couple gc rounds don't have quite // the maximum number of procs. newm(unsafe.Pointer(funcPC(mhelpgc)), nil) } _g_.m.locks-- } // forEachP calls fn(p) for every P p when p reaches a GC safe point. // If a P is currently executing code, this will bring the P to a GC // safe point and execute fn on that P. If the P is not executing code // (it is idle or in a syscall), this will call fn(p) directly while // preventing the P from exiting its state. This does not ensure that // fn will run on every CPU executing Go code, but it acts as a global // memory barrier. GC uses this as a "ragged barrier." // // The caller must hold worldsema. // //go:systemstack func forEachP(fn func(*p)) { mp := acquirem() _p_ := getg().m.p.ptr() lock(&sched.lock) if sched.safePointWait != 0 { throw("forEachP: sched.safePointWait != 0") } sched.safePointWait = gomaxprocs - 1 sched.safePointFn = fn // Ask all Ps to run the safe point function. for _, p := range allp[:gomaxprocs] { if p != _p_ { atomic.Store(&p.runSafePointFn, 1) } } preemptall() // Any P entering _Pidle or _Psyscall from now on will observe // p.runSafePointFn == 1 and will call runSafePointFn when // changing its status to _Pidle/_Psyscall. // Run safe point function for all idle Ps. sched.pidle will // not change because we hold sched.lock. for p := sched.pidle.ptr(); p != nil; p = p.link.ptr() { if atomic.Cas(&p.runSafePointFn, 1, 0) { fn(p) sched.safePointWait-- } } wait := sched.safePointWait > 0 unlock(&sched.lock) // Run fn for the current P. fn(_p_) // Force Ps currently in _Psyscall into _Pidle and hand them // off to induce safe point function execution. for i := 0; i < int(gomaxprocs); i++ { p := allp[i] s := p.status if s == _Psyscall && p.runSafePointFn == 1 && atomic.Cas(&p.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ handoffp(p) } } // Wait for remaining Ps to run fn. if wait { for { // Wait for 100us, then try to re-preempt in // case of any races. // // Requires system stack. if notetsleep(&sched.safePointNote, 100*1000) { noteclear(&sched.safePointNote) break } preemptall() } } if sched.safePointWait != 0 { throw("forEachP: not done") } for i := 0; i < int(gomaxprocs); i++ { p := allp[i] if p.runSafePointFn != 0 { throw("forEachP: P did not run fn") } } lock(&sched.lock) sched.safePointFn = nil unlock(&sched.lock) releasem(mp) } // runSafePointFn runs the safe point function, if any, for this P. // This should be called like // // if getg().m.p.runSafePointFn != 0 { // runSafePointFn() // } // // runSafePointFn must be checked on any transition in to _Pidle or // _Psyscall to avoid a race where forEachP sees that the P is running // just before the P goes into _Pidle/_Psyscall and neither forEachP // nor the P run the safe-point function. func runSafePointFn() { p := getg().m.p.ptr() // Resolve the race between forEachP running the safe-point // function on this P's behalf and this P running the // safe-point function directly. if !atomic.Cas(&p.runSafePointFn, 1, 0) { return } sched.safePointFn(p) lock(&sched.lock) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } unlock(&sched.lock) } // needm is called when a cgo callback happens on a // thread without an m (a thread not created by Go). // In this case, needm is expected to find an m to use // and return with m, g initialized correctly. // Since m and g are not set now (likely nil, but see below) // needm is limited in what routines it can call. In particular // it can only call nosplit functions (textflag 7) and cannot // do any scheduling that requires an m. // // In order to avoid needing heavy lifting here, we adopt // the following strategy: there is a stack of available m's // that can be stolen. Using compare-and-swap // to pop from the stack has ABA races, so we simulate // a lock by doing an exchange (via casp) to steal the stack // head and replace the top pointer with MLOCKED (1). // This serves as a simple spin lock that we can use even // without an m. The thread that locks the stack in this way // unlocks the stack by storing a valid stack head pointer. // // In order to make sure that there is always an m structure // available to be stolen, we maintain the invariant that there // is always one more than needed. At the beginning of the // program (if cgo is in use) the list is seeded with a single m. // If needm finds that it has taken the last m off the list, its job // is - once it has installed its own m so that it can do things like // allocate memory - to create a spare m and put it on the list. // // Each of these extra m's also has a g0 and a curg that are // pressed into service as the scheduling stack and current // goroutine for the duration of the cgo callback. // // When the callback is done with the m, it calls dropm to // put the m back on the list. //go:nosplit func needm(x byte) { if iscgo && !cgoHasExtraM { // Can happen if C/C++ code calls Go from a global ctor. // Can not throw, because scheduler is not initialized yet. write(2, unsafe.Pointer(&earlycgocallback[0]), int32(len(earlycgocallback))) exit(1) } // Lock extra list, take head, unlock popped list. // nilokay=false is safe here because of the invariant above, // that the extra list always contains or will soon contain // at least one m. mp := lockextra(false) // Set needextram when we've just emptied the list, // so that the eventual call into cgocallbackg will // allocate a new m for the extra list. We delay the // allocation until then so that it can be done // after exitsyscall makes sure it is okay to be // running at all (that is, there's no garbage collection // running right now). mp.needextram = mp.schedlink == 0 unlockextra(mp.schedlink.ptr()) // Save and block signals before installing g. // Once g is installed, any incoming signals will try to execute, // but we won't have the sigaltstack settings and other data // set up appropriately until the end of minit, which will // unblock the signals. This is the same dance as when // starting a new m to run Go code via newosproc. msigsave(mp) sigblock() // Install g (= m->curg). setg(mp.curg) atomic.Store(&mp.curg.atomicstatus, _Gsyscall) setGContext() // Initialize this thread to use the m. minit() } var earlycgocallback = []byte("fatal error: cgo callback before cgo call\n") // newextram allocates m's and puts them on the extra list. // It is called with a working local m, so that it can do things // like call schedlock and allocate. func newextram() { c := atomic.Xchg(&extraMWaiters, 0) if c > 0 { for i := uint32(0); i < c; i++ { oneNewExtraM() } } else { // Make sure there is at least one extra M. mp := lockextra(true) unlockextra(mp) if mp == nil { oneNewExtraM() } } } // oneNewExtraM allocates an m and puts it on the extra list. func oneNewExtraM() { // Create extra goroutine locked to extra m. // The goroutine is the context in which the cgo callback will run. // The sched.pc will never be returned to, but setting it to // goexit makes clear to the traceback routines where // the goroutine stack ends. var g0SP unsafe.Pointer var g0SPSize uintptr mp := allocm(nil, true, &g0SP, &g0SPSize) gp := malg(true, false, nil, nil) gp.gcscanvalid = true // fresh G, so no dequeueRescan necessary gp.gcscandone = true gp.gcRescan = -1 // malg returns status as Gidle, change to Gdead before adding to allg // where GC will see it. // gccgo uses Gdead here, not Gsyscall, because the split // stack context is not initialized. casgstatus(gp, _Gidle, _Gdead) gp.m = mp mp.curg = gp mp.locked = _LockInternal mp.lockedg = gp gp.lockedm = mp gp.goid = int64(atomic.Xadd64(&sched.goidgen, 1)) // put on allg for garbage collector allgadd(gp) // The context for gp will be set up in needm. // Here we need to set the context for g0. makeGContext(mp.g0, g0SP, g0SPSize) // Add m to the extra list. mnext := lockextra(true) mp.schedlink.set(mnext) unlockextra(mp) } // dropm is called when a cgo callback has called needm but is now // done with the callback and returning back into the non-Go thread. // It puts the current m back onto the extra list. // // The main expense here is the call to signalstack to release the // m's signal stack, and then the call to needm on the next callback // from this thread. It is tempting to try to save the m for next time, // which would eliminate both these costs, but there might not be // a next time: the current thread (which Go does not control) might exit. // If we saved the m for that thread, there would be an m leak each time // such a thread exited. Instead, we acquire and release an m on each // call. These should typically not be scheduling operations, just a few // atomics, so the cost should be small. // // TODO(rsc): An alternative would be to allocate a dummy pthread per-thread // variable using pthread_key_create. Unlike the pthread keys we already use // on OS X, this dummy key would never be read by Go code. It would exist // only so that we could register at thread-exit-time destructor. // That destructor would put the m back onto the extra list. // This is purely a performance optimization. The current version, // in which dropm happens on each cgo call, is still correct too. // We may have to keep the current version on systems with cgo // but without pthreads, like Windows. func dropm() { // Clear m and g, and return m to the extra list. // After the call to setg we can only call nosplit functions // with no pointer manipulation. mp := getg().m // Block signals before unminit. // Unminit unregisters the signal handling stack (but needs g on some systems). // Setg(nil) clears g, which is the signal handler's cue not to run Go handlers. // It's important not to try to handle a signal between those two steps. sigmask := mp.sigmask sigblock() unminit() // gccgo sets the stack to Gdead here, because the splitstack // context is not initialized. mp.curg.atomicstatus = _Gdead mp.curg.gcstack = nil mp.curg.gcnextsp = nil mnext := lockextra(true) mp.schedlink.set(mnext) setg(nil) // Commit the release of mp. unlockextra(mp) msigrestore(sigmask) } // A helper function for EnsureDropM. func getm() uintptr { return uintptr(unsafe.Pointer(getg().m)) } var extram uintptr var extraMWaiters uint32 // lockextra locks the extra list and returns the list head. // The caller must unlock the list by storing a new list head // to extram. If nilokay is true, then lockextra will // return a nil list head if that's what it finds. If nilokay is false, // lockextra will keep waiting until the list head is no longer nil. //go:nosplit func lockextra(nilokay bool) *m { const locked = 1 incr := false for { old := atomic.Loaduintptr(&extram) if old == locked { yield := osyield yield() continue } if old == 0 && !nilokay { if !incr { // Add 1 to the number of threads // waiting for an M. // This is cleared by newextram. atomic.Xadd(&extraMWaiters, 1) incr = true } usleep(1) continue } if atomic.Casuintptr(&extram, old, locked) { return (*m)(unsafe.Pointer(old)) } yield := osyield yield() continue } } //go:nosplit func unlockextra(mp *m) { atomic.Storeuintptr(&extram, uintptr(unsafe.Pointer(mp))) } // Stops execution of the current m until new work is available. // Returns with acquired P. func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } retry: lock(&sched.lock) mput(_g_.m) unlock(&sched.lock) notesleep(&_g_.m.park) noteclear(&_g_.m.park) if _g_.m.helpgc != 0 { gchelper() _g_.m.helpgc = 0 _g_.m.mcache = nil _g_.m.p = 0 goto retry } acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // Hands off P from syscall or locked M. // Always runs without a P, so write barriers are not allowed. //go:nowritebarrierrec func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // if it has local work, start it straight away if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } // if it has GC work, start it straight away if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } pidleput(_p_) unlock(&sched.lock) } // Tries to add one more P to execute G's. // Called when a G is made runnable (newproc, ready). func wakep() { // be conservative about spinning threads if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) } // Stops execution of the current m that is locked to a g until the g is runnable again. // Returns with acquired P. func stoplockedm() { _g_ := getg() if _g_.m.lockedg == nil || _g_.m.lockedg.lockedm != _g_.m { throw("stoplockedm: inconsistent locking") } if _g_.m.p != 0 { // Schedule another M to run this p. _p_ := releasep() handoffp(_p_) } incidlelocked(1) // Wait until another thread schedules lockedg again. notesleep(&_g_.m.park) noteclear(&_g_.m.park) status := readgstatus(_g_.m.lockedg) if status&^_Gscan != _Grunnable { print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n") dumpgstatus(_g_) throw("stoplockedm: not runnable") } acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // Schedules the locked m to run the locked gp. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func startlockedm(gp *g) { _g_ := getg() mp := gp.lockedm if mp == _g_.m { throw("startlockedm: locked to me") } if mp.nextp != 0 { throw("startlockedm: m has p") } // directly handoff current P to the locked m incidlelocked(-1) _p_ := releasep() mp.nextp.set(_p_) notewakeup(&mp.park) stopm() } // Stops the current m for stopTheWorld. // Returns when the world is restarted. func gcstopm() { _g_ := getg() if sched.gcwaiting == 0 { throw("gcstopm: not waiting for gc") } if _g_.m.spinning { _g_.m.spinning = false // OK to just drop nmspinning here, // startTheWorld will unpark threads as necessary. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("gcstopm: negative nmspinning") } } _p_ := releasep() lock(&sched.lock) _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) stopm() } // Schedules gp to run on the current M. // If inheritTime is true, gp inherits the remaining time in the // current time slice. Otherwise, it starts a new time slice. // Never returns. // // Write barriers are allowed because this is called immediately after // acquiring a P in several places. // //go:yeswritebarrierrec func execute(gp *g, inheritTime bool) { _g_ := getg() casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false if !inheritTime { _g_.m.p.ptr().schedtick++ } _g_.m.curg = gp gp.m = _g_.m // Check whether the profiler needs to be turned on or off. hz := sched.profilehz if _g_.m.profilehz != hz { resetcpuprofiler(hz) } if trace.enabled { // GoSysExit has to happen when we have a P, but before GoStart. // So we emit it here. if gp.syscallsp != 0 && gp.sysblocktraced { traceGoSysExit(gp.sysexitticks) } traceGoStart() } gogo(gp) } // Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } if getfingwait() && getfingwake() { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } // local runq if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there a thread blocked in netpoll already. // If there is any kind of logical race with that blocked thread // (e.g. it has already returned from netpoll, but does not set lastpoll yet), // this thread will do blocking netpoll below anyway. if netpollinited() && sched.lastpoll != 0 { if gp := netpoll(false); gp != nil { // non-blocking // netpoll returns list of goroutines linked by schedlink. injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // We have nothing to do. If we're in the GC mark phase, can // safely scan and blacken objects, and have work to do, run // idle-time marking rather than give up the P. if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) // Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again for i := 0; i < int(gomaxprocs); i++ { _p_ := allp[i] if _p_ != nil && !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // Check for idle-priority GC work again. if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } } // poll network if netpollinited() && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } gp := netpoll(true) // block until new work is available atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(gp) } } stopm() goto top } // pollWork returns true if there is non-background work this P could // be doing. This is a fairly lightweight check to be used for // background work loops, like idle GC. It checks a subset of the // conditions checked by the actual scheduler. func pollWork() bool { if sched.runqsize != 0 { return true } p := getg().m.p.ptr() if !runqempty(p) { return true } if netpollinited() && sched.lastpoll != 0 { if gp := netpoll(false); gp != nil { injectglist(gp) return true } } return false } func resetspinning() { _g_ := getg() if !_g_.m.spinning { throw("resetspinning: not a spinning m") } _g_.m.spinning = false nmspinning := atomic.Xadd(&sched.nmspinning, -1) if int32(nmspinning) < 0 { throw("findrunnable: negative nmspinning") } // M wakeup policy is deliberately somewhat conservative, so check if we // need to wakeup another P here. See "Worker thread parking/unparking" // comment at the top of the file for details. if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { wakep() } } // Injects the list of runnable G's into the scheduler. // Can run concurrently with GC. func injectglist(glist *g) { if glist == nil { return } if trace.enabled { for gp := glist; gp != nil; gp = gp.schedlink.ptr() { traceGoUnpark(gp, 0) } } lock(&sched.lock) var n int for n = 0; glist != nil; n++ { gp := glist glist = gp.schedlink.ptr() casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) } unlock(&sched.lock) for ; n != 0 && sched.npidle != 0; n-- { startm(nil, false) } } // One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } if _g_.m.lockedg != nil { stoplockedm() execute(_g_.m.lockedg, false) // Never returns. } top: if sched.gcwaiting != 0 { gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() } var gp *g var inheritTime bool if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } // Because gccgo does not implement preemption as a stack check, // we need to check for preemption here for fairness. // Otherwise goroutines on the local queue may starve // goroutines on the global queue. // Since we preempt by storing the goroutine on the global // queue, this is the only place we need to check preempt. if gp != nil && gp.preempt { gp.preempt = false lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) goto top } } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { resetspinning() } if gp.lockedm != nil { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top } execute(gp, inheritTime) } // dropg removes the association between m and the current goroutine m->curg (gp for short). // Typically a caller sets gp's status away from Grunning and then // immediately calls dropg to finish the job. The caller is also responsible // for arranging that gp will be restarted using ready at an // appropriate time. After calling dropg and arranging for gp to be // readied later, the caller can do other work but eventually should // call schedule to restart the scheduling of goroutines on this m. func dropg() { _g_ := getg() setMNoWB(&_g_.m.curg.m, nil) setGNoWB(&_g_.m.curg, nil) } func beforefork() { gp := getg().m.curg // Fork can hang if preempted with signals frequently enough (see issue 5517). // Ensure that we stay on the same M where we disable profiling. gp.m.locks++ if gp.m.profilehz != 0 { resetcpuprofiler(0) } } // Called from syscall package before fork. //go:linkname syscall_runtime_BeforeFork syscall.runtime_BeforeFork //go:nosplit func syscall_runtime_BeforeFork() { systemstack(beforefork) } func afterfork() { gp := getg().m.curg hz := sched.profilehz if hz != 0 { resetcpuprofiler(hz) } gp.m.locks-- } // Called from syscall package after fork in parent. //go:linkname syscall_runtime_AfterFork syscall.runtime_AfterFork //go:nosplit func syscall_runtime_AfterFork() { systemstack(afterfork) } // Put on gfree list. // If local list is too long, transfer a batch to the global list. func gfput(_p_ *p, gp *g) { if readgstatus(gp) != _Gdead { throw("gfput: bad status (not Gdead)") } gp.schedlink.set(_p_.gfree) _p_.gfree = gp _p_.gfreecnt++ if _p_.gfreecnt >= 64 { lock(&sched.gflock) for _p_.gfreecnt >= 32 { _p_.gfreecnt-- gp = _p_.gfree _p_.gfree = gp.schedlink.ptr() gp.schedlink.set(sched.gfree) sched.gfree = gp sched.ngfree++ } unlock(&sched.gflock) } } // Get from gfree list. // If local list is empty, grab a batch from global list. func gfget(_p_ *p) *g { retry: gp := _p_.gfree if gp == nil && sched.gfree != nil { lock(&sched.gflock) for _p_.gfreecnt < 32 { if sched.gfree != nil { gp = sched.gfree sched.gfree = gp.schedlink.ptr() } else { break } _p_.gfreecnt++ sched.ngfree-- gp.schedlink.set(_p_.gfree) _p_.gfree = gp } unlock(&sched.gflock) goto retry } if gp != nil { _p_.gfree = gp.schedlink.ptr() _p_.gfreecnt-- } return gp } // Purge all cached G's from gfree list to the global list. func gfpurge(_p_ *p) { lock(&sched.gflock) for _p_.gfreecnt != 0 { _p_.gfreecnt-- gp := _p_.gfree _p_.gfree = gp.schedlink.ptr() gp.schedlink.set(sched.gfree) sched.gfree = gp sched.ngfree++ } unlock(&sched.gflock) } // dolockOSThread is called by LockOSThread and lockOSThread below // after they modify m.locked. Do not allow preemption during this call, // or else the m might be different in this function than in the caller. //go:nosplit func dolockOSThread() { _g_ := getg() _g_.m.lockedg = _g_ _g_.lockedm = _g_.m } //go:nosplit // LockOSThread wires the calling goroutine to its current operating system thread. // Until the calling goroutine exits or calls UnlockOSThread, it will always // execute in that thread, and no other goroutine can. func LockOSThread() { getg().m.locked |= _LockExternal dolockOSThread() } //go:nosplit func lockOSThread() { getg().m.locked += _LockInternal dolockOSThread() } // dounlockOSThread is called by UnlockOSThread and unlockOSThread below // after they update m->locked. Do not allow preemption during this call, // or else the m might be in different in this function than in the caller. //go:nosplit func dounlockOSThread() { _g_ := getg() if _g_.m.locked != 0 { return } _g_.m.lockedg = nil _g_.lockedm = nil } //go:nosplit // UnlockOSThread unwires the calling goroutine from its fixed operating system thread. // If the calling goroutine has not called LockOSThread, UnlockOSThread is a no-op. func UnlockOSThread() { getg().m.locked &^= _LockExternal dounlockOSThread() } //go:nosplit func unlockOSThread() { _g_ := getg() if _g_.m.locked < _LockInternal { systemstack(badunlockosthread) } _g_.m.locked -= _LockInternal dounlockOSThread() } func badunlockosthread() { throw("runtime: internal error: misuse of lockOSThread/unlockOSThread") } func gcount() int32 { n := int32(allglen) - sched.ngfree - int32(atomic.Load(&sched.ngsys)) for i := 0; ; i++ { _p_ := allp[i] if _p_ == nil { break } n -= _p_.gfreecnt } // All these variables can be changed concurrently, so the result can be inconsistent. // But at least the current goroutine is running. if n < 1 { n = 1 } return n } func mcount() int32 { return sched.mcount } // Change number of processors. The world is stopped, sched is locked. // gcworkbufs are not being modified by either the GC or // the write barrier code. // Returns list of Ps with local work, they need to be scheduled by the caller. func procresize(nprocs int32) *p { old := gomaxprocs if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs { throw("procresize: invalid arg") } if trace.enabled { traceGomaxprocs(nprocs) } // update statistics now := nanotime() if sched.procresizetime != 0 { sched.totaltime += int64(old) * (now - sched.procresizetime) } sched.procresizetime = now // initialize new P's for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) pp.id = i pp.status = _Pgcstop pp.sudogcache = pp.sudogbuf[:0] pp.deferpool = pp.deferpoolbuf[:0] atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } if pp.mcache == nil { if old == 0 && i == 0 { if getg().m.mcache == nil { throw("missing mcache?") } pp.mcache = getg().m.mcache // bootstrap } else { pp.mcache = allocmcache() } } } // free unused P's for i := nprocs; i < old; i++ { p := allp[i] if trace.enabled { if p == getg().m.p.ptr() { // moving to p[0], pretend that we were descheduled // and then scheduled again to keep the trace sane. traceGoSched() traceProcStop(p) } } // move all runnable goroutines to the global queue for p.runqhead != p.runqtail { // pop from tail of local queue p.runqtail-- gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() // push onto head of global queue globrunqputhead(gp) } if p.runnext != 0 { globrunqputhead(p.runnext.ptr()) p.runnext = 0 } // if there's a background worker, make it runnable and put // it on the global queue so it can clean itself up if gp := p.gcBgMarkWorker.ptr(); gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } globrunqput(gp) // This assignment doesn't race because the // world is stopped. p.gcBgMarkWorker.set(nil) } for i := range p.sudogbuf { p.sudogbuf[i] = nil } p.sudogcache = p.sudogbuf[:0] for i := range p.deferpoolbuf { p.deferpoolbuf[i] = nil } p.deferpool = p.deferpoolbuf[:0] freemcache(p.mcache) p.mcache = nil gfpurge(p) traceProcFree(p) p.status = _Pdead // can't free P itself because it can be referenced by an M in syscall } _g_ := getg() if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // continue to use the current P _g_.m.p.ptr().status = _Prunning } else { // release the current P and acquire allp[0] if _g_.m.p != 0 { _g_.m.p.ptr().m = 0 } _g_.m.p = 0 _g_.m.mcache = nil p := allp[0] p.m = 0 p.status = _Pidle acquirep(p) if trace.enabled { traceGoStart() } } var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] if _g_.m.p.ptr() == p { continue } p.status = _Pidle if runqempty(p) { pidleput(p) } else { p.m.set(mget()) p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) return runnablePs } // Associate p and the current m. // // This function is allowed to have write barriers even if the caller // isn't because it immediately acquires _p_. // //go:yeswritebarrierrec func acquirep(_p_ *p) { // Do the part that isn't allowed to have write barriers. acquirep1(_p_) // have p; write barriers now allowed _g_ := getg() _g_.m.mcache = _p_.mcache if trace.enabled { traceProcStart() } } // acquirep1 is the first step of acquirep, which actually acquires // _p_. This is broken out so we can disallow write barriers for this // part, since we don't yet have a P. // //go:nowritebarrierrec func acquirep1(_p_ *p) { _g_ := getg() if _g_.m.p != 0 || _g_.m.mcache != nil { throw("acquirep: already in go") } if _p_.m != 0 || _p_.status != _Pidle { id := int32(0) if _p_.m != 0 { id = _p_.m.ptr().id } print("acquirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n") throw("acquirep: invalid p state") } _g_.m.p.set(_p_) _p_.m.set(_g_.m) _p_.status = _Prunning } // Disassociate p and the current m. func releasep() *p { _g_ := getg() if _g_.m.p == 0 || _g_.m.mcache == nil { throw("releasep: invalid arg") } _p_ := _g_.m.p.ptr() if _p_.m.ptr() != _g_.m || _p_.mcache != _g_.m.mcache || _p_.status != _Prunning { print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", _p_.m, " m->mcache=", _g_.m.mcache, " p->mcache=", _p_.mcache, " p->status=", _p_.status, "\n") throw("releasep: invalid p state") } if trace.enabled { traceProcStop(_g_.m.p.ptr()) } _g_.m.p = 0 _g_.m.mcache = nil _p_.m = 0 _p_.status = _Pidle return _p_ } func incidlelocked(v int32) { lock(&sched.lock) sched.nmidlelocked += v if v > 0 { checkdead() } unlock(&sched.lock) } // Check for deadlock situation. // The check is based on number of running M's, if 0 -> deadlock. func checkdead() { // For -buildmode=c-shared or -buildmode=c-archive it's OK if // there are no running goroutines. The calling program is // assumed to be running. if islibrary || isarchive { return } // If we are dying because of a signal caught on an already idle thread, // freezetheworld will cause all running threads to block. // And runtime will essentially enter into deadlock state, // except that there is a thread that will call exit soon. if panicking > 0 { return } // -1 for sysmon run := sched.mcount - sched.nmidle - sched.nmidlelocked - 1 if run > 0 { return } if run < 0 { print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", sched.mcount, "\n") throw("checkdead: inconsistent counts") } grunning := 0 lock(&allglock) for i := 0; i < len(allgs); i++ { gp := allgs[i] if isSystemGoroutine(gp) { continue } s := readgstatus(gp) switch s &^ _Gscan { case _Gwaiting: grunning++ case _Grunnable, _Grunning, _Gsyscall: unlock(&allglock) print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n") throw("checkdead: runnable g") } } unlock(&allglock) if grunning == 0 { // possible if main goroutine calls runtime·Goexit() throw("no goroutines (main called runtime.Goexit) - deadlock!") } // Maybe jump time forward for playground. gp := timejump() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) _p_ := pidleget() if _p_ == nil { throw("checkdead: no p for timer") } mp := mget() if mp == nil { // There should always be a free M since // nothing is running. throw("checkdead: no m for timer") } mp.nextp.set(_p_) notewakeup(&mp.park) return } getg().m.throwing = -1 // do not dump full stacks throw("all goroutines are asleep - deadlock!") } // forcegcperiod is the maximum time in nanoseconds between garbage // collections. If we go this long without a garbage collection, one // is forced to run. // // This is a variable for testing purposes. It normally doesn't change. var forcegcperiod int64 = 2 * 60 * 1e9 // Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() { // If a heap span goes unused for 5 minutes after a garbage collection, // we hand it back to the operating system. scavengelimit := int64(5 * 60 * 1e9) if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 } lastscavenge := nanotime() nscavenge := 0 lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) for { if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. maxsleep := forcegcperiod / 2 if scavengelimit < forcegcperiod { maxsleep = scavengelimit / 2 } notetsleep(&sched.sysmonnote, maxsleep) lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) idle = 0 delay = 20 } unlock(&sched.lock) } // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() unixnow := unixnanotime() if lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(gp) incidlelocked(1) } } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } // check if we need to force a GC lastgc := int64(atomic.Load64(&memstats.last_gc)) if gcphase == _GCoff && lastgc != 0 && unixnow-lastgc > forcegcperiod && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 forcegc.g.schedlink = 0 injectglist(forcegc.g) unlock(&forcegc.lock) } // scavenge heap once in a while if lastscavenge+scavengelimit/2 < now { mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) lastscavenge = now nscavenge++ } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } } var pdesc [_MaxGomaxprocs]struct { schedtick uint32 schedwhen int64 syscalltick uint32 syscallwhen int64 } // forcePreemptNS is the time slice given to a G before it is // preempted. const forcePreemptNS = 10 * 1000 * 1000 // 10ms func retake(now int64) uint32 { n := 0 for i := int32(0); i < gomaxprocs; i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &pdesc[i] s := _p_.status if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) } else if s == _Prunning { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } return uint32(n) } // Tell all goroutines that they have been preempted and they should stop. // This function is purely best-effort. It can fail to inform a goroutine if a // processor just started running it. // No locks need to be held. // Returns true if preemption request was issued to at least one goroutine. func preemptall() bool { res := false for i := int32(0); i < gomaxprocs; i++ { _p_ := allp[i] if _p_ == nil || _p_.status != _Prunning { continue } if preemptone(_p_) { res = true } } return res } // Tell the goroutine running on processor P to stop. // This function is purely best-effort. It can incorrectly fail to inform the // goroutine. It can send inform the wrong goroutine. Even if it informs the // correct goroutine, that goroutine might ignore the request if it is // simultaneously executing newstack. // No lock needs to be held. // Returns true if preemption request was issued. // The actual preemption will happen at some point in the future // and will be indicated by the gp->status no longer being // Grunning func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // At this point the gc implementation sets gp.stackguard0 to // a value that causes the goroutine to suspend itself. // gccgo has no support for this, and it's hard to support. // The split stack code reads a value from its TCB. // We have no way to set a value in the TCB of a different thread. // And, of course, not all systems support split stack anyhow. // Checking the field in the g is expensive, since it requires // loading the g from TLS. The best mechanism is likely to be // setting a global variable and figuring out a way to efficiently // check that global variable. // // For now we check gp.preempt in schedule and mallocgc, // which is at least better than doing nothing at all. return true } var starttime int64 func schedtrace(detailed bool) { now := nanotime() if starttime == 0 { starttime = now } lock(&sched.lock) print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle, " threads=", sched.mcount, " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize) if detailed { print(" gcwaiting=", sched.gcwaiting, " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait, "\n") } // We must be careful while reading data from P's, M's and G's. // Even if we hold schedlock, most data can be changed concurrently. // E.g. (p->m ? p->m->id : -1) can crash if p->m changes from non-nil to nil. for i := int32(0); i < gomaxprocs; i++ { _p_ := allp[i] if _p_ == nil { continue } mp := _p_.m.ptr() h := atomic.Load(&_p_.runqhead) t := atomic.Load(&_p_.runqtail) if detailed { id := int32(-1) if mp != nil { id = mp.id } print(" P", i, ": status=", _p_.status, " schedtick=", _p_.schedtick, " syscalltick=", _p_.syscalltick, " m=", id, " runqsize=", t-h, " gfreecnt=", _p_.gfreecnt, "\n") } else { // In non-detailed mode format lengths of per-P run queues as: // [len1 len2 len3 len4] print(" ") if i == 0 { print("[") } print(t - h) if i == gomaxprocs-1 { print("]\n") } } } if !detailed { unlock(&sched.lock) return } for mp := allm; mp != nil; mp = mp.alllink { _p_ := mp.p.ptr() gp := mp.curg lockedg := mp.lockedg id1 := int32(-1) if _p_ != nil { id1 = _p_.id } id2 := int64(-1) if gp != nil { id2 = gp.goid } id3 := int64(-1) if lockedg != nil { id3 = lockedg.goid } print(" M", mp.id, ": p=", id1, " curg=", id2, " mallocing=", mp.mallocing, " throwing=", mp.throwing, " preemptoff=", mp.preemptoff, ""+" locks=", mp.locks, " dying=", mp.dying, " helpgc=", mp.helpgc, " spinning=", mp.spinning, " blocked=", mp.blocked, " lockedg=", id3, "\n") } lock(&allglock) for gi := 0; gi < len(allgs); gi++ { gp := allgs[gi] mp := gp.m lockedm := gp.lockedm id1 := int32(-1) if mp != nil { id1 = mp.id } id2 := int32(-1) if lockedm != nil { id2 = lockedm.id } print(" G", gp.goid, ": status=", readgstatus(gp), "(", gp.waitreason, ") m=", id1, " lockedm=", id2, "\n") } unlock(&allglock) unlock(&sched.lock) } // Put mp on midle list. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func mput(mp *m) { mp.schedlink = sched.midle sched.midle.set(mp) sched.nmidle++ checkdead() } // Try to get an m from midle list. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func mget() *m { mp := sched.midle.ptr() if mp != nil { sched.midle = mp.schedlink sched.nmidle-- } return mp } // Put gp on the global runnable queue. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func globrunqput(gp *g) { gp.schedlink = 0 if sched.runqtail != 0 { sched.runqtail.ptr().schedlink.set(gp) } else { sched.runqhead.set(gp) } sched.runqtail.set(gp) sched.runqsize++ } // Put gp at the head of the global runnable queue. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func globrunqputhead(gp *g) { gp.schedlink = sched.runqhead sched.runqhead.set(gp) if sched.runqtail == 0 { sched.runqtail.set(gp) } sched.runqsize++ } // Put a batch of runnable goroutines on the global runnable queue. // Sched must be locked. func globrunqputbatch(ghead *g, gtail *g, n int32) { gtail.schedlink = 0 if sched.runqtail != 0 { sched.runqtail.ptr().schedlink.set(ghead) } else { sched.runqhead.set(ghead) } sched.runqtail.set(gtail) sched.runqsize += n } // Try get a batch of G's from the global runnable queue. // Sched must be locked. func globrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { return nil } n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n if sched.runqsize == 0 { sched.runqtail = 0 } gp := sched.runqhead.ptr() sched.runqhead = gp.schedlink n-- for ; n > 0; n-- { gp1 := sched.runqhead.ptr() sched.runqhead = gp1.schedlink runqput(_p_, gp1, false) } return gp } // Put p to on _Pidle list. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func pidleput(_p_ *p) { if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } _p_.link = sched.pidle sched.pidle.set(_p_) atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic } // Try get a p from _Pidle list. // Sched must be locked. // May run during STW, so write barriers are not allowed. //go:nowritebarrierrec func pidleget() *p { _p_ := sched.pidle.ptr() if _p_ != nil { sched.pidle = _p_.link atomic.Xadd(&sched.npidle, -1) // TODO: fast atomic } return _p_ } // runqempty returns true if _p_ has no Gs on its local run queue. // It never returns true spuriously. func runqempty(_p_ *p) bool { // Defend against a race where 1) _p_ has G1 in runqnext but runqhead == runqtail, // 2) runqput on _p_ kicks G1 to the runq, 3) runqget on _p_ empties runqnext. // Simply observing that runqhead == runqtail and then observing that runqnext == nil // does not mean the queue is empty. for { head := atomic.Load(&_p_.runqhead) tail := atomic.Load(&_p_.runqtail) runnext := atomic.Loaduintptr((*uintptr)(unsafe.Pointer(&_p_.runnext))) if tail == atomic.Load(&_p_.runqtail) { return head == tail && runnext == 0 } } } // To shake out latent assumptions about scheduling order, // we introduce some randomness into scheduling decisions // when running with the race detector. // The need for this was made obvious by changing the // (deterministic) scheduling order in Go 1.5 and breaking // many poorly-written tests. // With the randomness here, as long as the tests pass // consistently with -race, they shouldn't have latent scheduling // assumptions. const randomizeScheduler = raceenabled // runqput tries to put g on the local runnable queue. // If next if false, runqput adds g to the tail of the runnable queue. // If next is true, runqput puts g in the _p_.runnext slot. // If the run queue is full, runnext puts g on the global queue. // Executed only by the owner P. func runqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrand()%2 == 0 { next = false } if next { retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // Kick the old runnext out to the regular run queue. gp = oldnext.ptr() } retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry } // Put g and a batch of work from local runnable queue on global queue. // Executed only by the owner P. func runqputslow(_p_ *p, gp *g, h, t uint32) bool { var batch [len(_p_.runq)/2 + 1]*g // First, grab a batch from local queue. n := t - h n = n / 2 if n != uint32(len(_p_.runq)/2) { throw("runqputslow: queue is not full") } for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume return false } batch[n] = gp if randomizeScheduler { for i := uint32(1); i <= n; i++ { j := fastrand() % (i + 1) batch[i], batch[j] = batch[j], batch[i] } } // Link the goroutines. for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } // Now put the batch on global queue. lock(&sched.lock) globrunqputbatch(batch[0], batch[n], int32(n+1)) unlock(&sched.lock) return true } // Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. for { next := _p_.runnext if next == 0 { break } if _p_.runnext.cas(next, 0) { return next.ptr(), true } } for { h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } } // Grabs a batch of goroutines from _p_'s runnable queue into batch. // Batch is a ring buffer starting at batchHead. // Returns number of grabbed goroutines. // Can be executed by any P. func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer n := t - h n = n - n/2 if n == 0 { if stealRunNextG { // Try to steal from _p_.runnext. if next := _p_.runnext; next != 0 { // Sleep to ensure that _p_ isn't about to run the g we // are about to steal. // The important use case here is when the g running on _p_ // ready()s another g and then almost immediately blocks. // Instead of stealing runnext in this window, back off // to give _p_ a chance to schedule runnext. This will avoid // thrashing gs between different Ps. // A sync chan send/recv takes ~50ns as of time of writing, // so 3us gives ~50x overshoot. if GOOS != "windows" { usleep(3) } else { // On windows system timer granularity is 1-15ms, // which is way too much for this optimization. // So just yield. osyield() } if !_p_.runnext.cas(next, 0) { continue } batch[batchHead%uint32(len(batch))] = next return 1 } } return 0 } if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } } // Steal half of elements from local runnable queue of p2 // and put onto local runnable queue of p. // Returns one of the stolen elements (or nil if failed). func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { t := _p_.runqtail n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } n-- gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { return gp } h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers if t-h+n >= uint32(len(_p_.runq)) { throw("runqsteal: runq overflow") } atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption return gp } //go:linkname setMaxThreads runtime_debug.setMaxThreads func setMaxThreads(in int) (out int) { lock(&sched.lock) out = int(sched.maxmcount) if in > 0x7fffffff { // MaxInt32 sched.maxmcount = 0x7fffffff } else { sched.maxmcount = int32(in) } checkmcount() unlock(&sched.lock) return } //go:nosplit func procPin() int { _g_ := getg() mp := _g_.m mp.locks++ return int(mp.p.ptr().id) } //go:nosplit func procUnpin() { _g_ := getg() _g_.m.locks-- } //go:linkname sync_runtime_procPin sync.runtime_procPin //go:nosplit func sync_runtime_procPin() int { return procPin() } //go:linkname sync_runtime_procUnpin sync.runtime_procUnpin //go:nosplit func sync_runtime_procUnpin() { procUnpin() } //go:linkname sync_atomic_runtime_procPin sync_atomic.runtime_procPin //go:nosplit func sync_atomic_runtime_procPin() int { return procPin() } //go:linkname sync_atomic_runtime_procUnpin sync_atomic.runtime_procUnpin //go:nosplit func sync_atomic_runtime_procUnpin() { procUnpin() } // Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq on on other Ps. if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true } //go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) } var stealOrder randomOrder // randomOrder/randomEnum are helper types for randomized work stealing. // They allow to enumerate all Ps in different pseudo-random orders without repetitions. // The algorithm is based on the fact that if we have X such that X and GOMAXPROCS // are coprime, then a sequences of (i + X) % GOMAXPROCS gives the required enumeration. type randomOrder struct { count uint32 coprimes []uint32 } type randomEnum struct { i uint32 count uint32 pos uint32 inc uint32 } func (ord *randomOrder) reset(count uint32) { ord.count = count ord.coprimes = ord.coprimes[:0] for i := uint32(1); i <= count; i++ { if gcd(i, count) == 1 { ord.coprimes = append(ord.coprimes, i) } } } func (ord *randomOrder) start(i uint32) randomEnum { return randomEnum{ count: ord.count, pos: i % ord.count, inc: ord.coprimes[i%uint32(len(ord.coprimes))], } } func (enum *randomEnum) done() bool { return enum.i == enum.count } func (enum *randomEnum) next() { enum.i++ enum.pos = (enum.pos + enum.inc) % enum.count } func (enum *randomEnum) position() uint32 { return enum.pos } func gcd(a, b uint32) uint32 { for b != 0 { a, b = b, a%b } return a }