我们需要找到另一种的方法。从一开始我们就开始讨论如何让请求处理程序的生命周期尽可能的短,并在后台产生处理。当然,这是在 RubyonRails必须要做的事情,否则,不管你是使用puma,unicorn还是 passenger,你的所有的可用的web worker都将阻塞。
那么我们就需要利用常见的解决方案来完成这项工作,比如Resque,Sidekiq, SQS等。当然还有其他工具,因为有很多方法可以实现。
因此,我们第二次改进是创建一个buffer channel,我们可以将一些作业请求扔进队列并将它们上传到S3,由于我们可以控制队列的最大长度,并且有足够的RAM来排队处理内存中的作业,因此我们认为只要在通道队列中缓冲作业就行了。
- var Queue chan Payload
-
- func init() {
- Queue = make(chan Payload, MAX_QUEUE)
- }
-
- func payloadHandler(w http.ResponseWriter, r *http.Request) {
- ...
- // Go through each payload and queue items individually to be posted to S3
- for _, payload := range content.Payloads {
- Queue <- payload
- }
- ...
- }
然后,为了将任务从buffer channel中取出并处理它们,我们正在使用这样的方式:
- func StartProcessor() {
- for {
- select {
- case job := <-Queue:
- job.payload.UploadToS3() // <-- STILL NOT GOOD
- }
- }
- }
说实话,我不知道我们在想什么,这肯定是一个难熬的夜晚。这种方法并没有给我们带来什么提升,我们用一个缓冲的队列替换了有缺陷的并发,也只是推迟了问题的产生时间而已。我们的同步处理器每次只向S3上传一个有效载荷,由于传入请求的速率远远大于单个处理器上传到S3的能力,因此我们的buffer channel迅速达到极限,队列已经阻塞并且无法再往里边添加作业。
我们只是简单的绕过了这个问题,最终导致我们的系统完全崩溃。在我们部署这个有缺陷的版本后,我们的延迟持续的升高。
更好的解决方案
我们决定在Go channel上使用一个通用模式来创建一个 2-tier(双重)channel系统,一个用来处理排队的job,一个用来控制有多少worker在 JobQueue上并发工作。
这个想法是将上传到S3的并行速度提高到一个可持续的速度,同时不会造成机器瘫痪,也不会引发S3的连接错误。
所以我们选择创建一个 Job/Worker模式。对于那些熟悉Java,C#等的人来说,可以将其视为Golang使用channel来实现WorkerThread-Pool的方式。
- var (
- MaxWorker = os.Getenv("MAX_WORKERS")
- MaxQueue = os.Getenv("MAX_QUEUE")
- )
-
- // Job represents the job to be run
- type Job struct {
- Payload Payload
- }
-
- // A buffered channel that we can send work requests on.
- var JobQueue chan Job
-
- // Worker represents the worker that executes the job
- type Worker struct {
- WorkerPool chan chan Job
- JobChannel chan Job
- quit chan bool
- }
-
- func NewWorker(workerPool chan chan Job) Worker {
- return Worker{
- WorkerPool: workerPool,
- JobChannel: make(chan Job),
- quit: make(chan bool)}
- }
-
- // Start method starts the run loop for the worker, listening for a quit channel in
- // case we need to stop it
- func (w Worker) Start() {
- go func() {
- for {
- // register the current worker into the worker queue.
- w.WorkerPool <- w.JobChannel
-
- select {
- case job := <-w.JobChannel:
- // we have received a work request.
- if err := job.Payload.UploadToS3(); err != nil {
- log.Errorf("Error uploading to S3: %s", err.Error())
- }
-
- case <-w.quit:
- // we have received a signal to stop
- return
- }
- }
- }()
- }
-
- // Stop signals the worker to stop listening for work requests.
- func (w Worker) Stop() {
- go func() {
- w.quit <- true
- }()
- }
(编辑:ASP站长网)
|