golang分钟级百万请求处理(翻译)
本文翻译自:
http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
背景
在 Malwarebytes,我们正在经历着惊人的增长,自从我于一年前加入了这家硅谷公司以来,我的主要责任之一就是设计和开发多个系统,为一个快速增长的安全公司提供动力以及支持每天由数百万人使用的产品所需的基础设施。我已经在几家不同的反病毒和反恶意软件行业工作了12年,知道由于我们每天处理的大量数据,这些系统可能变得非常复杂。
有趣的是,过去大约9年中,我参与的所有Web后端开发基本上都是在Ruby on Rails中完成的。别误会,我很喜欢Ruby on Rails,我认为它是一个神奇的环境,但久而久之,你开始以Ruby的方式思考和设计系统,忘记了如果你能利用多线程、并行、快速执行和小内存开销,你的软件架构会多么高效简单。在许多年里,我是一个C/C++、Delphi和C#开发人员,我刚刚开始意识到如果有正确的工具来完成工作,事情可以变得更简单不是吗。
作为首席架构师,我不太关注网络世界上经常进行的语言和框架战争。我相信效率、生产力和代码可维护性主要取决于你如何简单地设计你的解决方案。
问题
在开发我们的匿名遥测和分析系统的一个模块时,我们的目标是能够处理来自数百万端点的大量POST请求。Web处理程序将接收JSON文档,该文档可能包含许多有效负载的集合,这些有效负载需要写入Amazon S3中,以便我们的MapReduce系统以后可以对这些数据进行操作。
传统上,我们会考虑创建一个工作层架构,利用诸如以下工具:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
等等...
然后设置两个不同的集群,一个用于Web前端,另一个用于工人,以便我们可以扩展我们可以处理的后台工作量。
但从开始,我们的团队就知道我们应该用Go来做这件事,因为在讨论阶段我们看到这可能是一个非常大的流量系统。我已经使用Go约两年的时间了,在这里我们开发了一些系统,但没有任何一个系统可以承受这么大的负载。
我们首先创建了一些结构来定义我们将通过POST调用接收到的Web请求有效负载,以及将其上传到我们的S3存储桶中的方法。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
最初我们采用了一种非常朴素的Go协程处理方式,尝试将作业处理并行化为一个简单的协程
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于中等负载,这对大多数人可能有效,但很快就证明不适用于大规模。我们预计会有许多请求,但当我们将第一个版本部署到生产中时,看到的请求数量远远超出我们的预期。我们完全低估了流量。
上述方法有多个问题。我们无法控制生成的Go程数量,而且由于我们每分钟收到100万个POST请求,当然这段代码很快就会崩溃。
再次尝试
我们需要找到一种不同的方法。从一开始,我们就开始讨论我们如何需要保持请求处理程序的生命周期非常短并在后台生成处理。当然,在Ruby on Rails世界中,这就是你必须做的,否则你将阻塞所有可用的工作进程Web处理器,无论你是使用puma、unicorn、passenger(请不要涉及JRuby的讨论)。然后,我们需要利用常见的解决方案来做到这一点,例如Resque,Sidekiq,SQS等。这个列表还很长,因为有许多实现这个目标的方法。
因此,第二次迭代是创建带缓冲的通道,在其中可以对一些工作进行排队并将它们上传到S3,由于我们可以控制队列中最大项目数,并且我们有足够的RAM在内存中对工作进行排队,所以我们认为只要将工作缓冲在通道队列中就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
然后实际上出队列工作并处理它们,我们使用的类似于这个东西:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
说实话,我不知道我们当时在想什么。这一定是一个充满了红牛能量的深夜。这种方法并没有带给我们任何好处,我们用缓冲队列代替了有缺陷的并发性,这只是推迟了问题。我们的同步处理器一次只上传一个有效载荷到 S3,由于请求的速度远远大于单个处理器上传到 S3的能力,我们的缓冲通道很快就达到了极限,并阻止了请求处理程序的队列能力来排队更多的项。
我们只是在回避问题,并开始倒计时,最终导致了系统的崩溃。自从我们部署了这个有缺陷的版本之后,我们的延迟率以恒定的速率不断增加。
更好的解决方案
我们决定在使用Go通道时采用通用模式,以创建一个二层通道系统,一个用于排队作业,另一个用于控制有多少工作程序同时操作JobQueue。
这个想法是将上传到S3的并行化到一个相对可持续的速率,既不会使机器崩溃,也不会从S3生成连接错误。因此,我们选择创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的人来说,可以将其视为使用通道实现Worker线程池的Golang方式。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我们修改了Web请求处理程序,使用有效载荷创建Job结构体实例,然后将其发送到JobQueue通道中供工作人员挑选。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
在我们的 Web 服务器初始化期间,我们创建了一个 Dispatcher ,并调用 Run()函数来创建工作线程池并开始监听出现在 JobQueue 中的作业。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
以下是我们调度程序实现的代码:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
请注意,我们提供要实例化并添加到工作池中的最大工作人员数量。由于我们在此项目中使用了基于Docker的Go环境的Amazon Elasticbeanstalk,并且我们始终尝试遵循12因素方法来配置我们的生产系统,因此我们从环境变量中读取这些值。这样,我们就可以控制工作人员的数量和作业队列的最大大小,因此我们可以快速调整这些值,而无需重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
弹性负载均衡器完全启动后的几分钟内,我们看到弹性 Beanstalk 应用程序每分钟服务接近一百万次请求。在早上的几个小时内,我们通常会出现交通高峰,每分钟超过一百万。
新代码部署后不久,服务器数量从 100 台减少到约 20 台
在我们正确配置了集群和自动扩缩容设置之后,我们能够将它降低到仅使用 4 个 EC2 c4.Large 实例,并设置 Elastic Auto-Scaling 如果 CPU 持续 5 分钟以上超过 90%,就会产生一个新的实例
结论
在我看来,简洁总是最重要的。我们本可以设计一个包含许多队列、后台工作进程、复杂部署的复杂系统,但相反我们决定利用Elasticbeanstalk自动扩展的功能和Golang所提供的高效且简单的并发处理方式。
并不是每天都有一个由仅4台机器组成的集群,它们可能比我当前的MacBook Pro还要不那么强大,但能够处理每分钟1百万次写入Amazon S3存储桶的POST请求。
总有合适的工具来完成特定的工作。当你的Ruby on Rails系统需要一个非常强大的Web处理器时,不妨考虑一下Ruby生态系统之外的简单但更强大的替代方案。