」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

使用 GitLab CI/CD 和 Terraform 實作 Lambda 以進行 SFTP 整合、Go 中的 S Databricks

發佈於2024-11-09
瀏覽:266

Implementando uma Lambda com GitLab CI/CD e Terraform para Integração SFTP, S Databricks em Go

通过 Databricks 中的流程自动化降低成本

我的客户需要降低在 Databricks 上运行的流程的成本。 Databricks 负责的功能之一是从各种 SFTP 收集文件,解压缩它们并将它们放入数据湖中。

自动化数据工作流程是现代数据工程的重要组成部分。在本文中,我们将探讨如何使用 GitLab CI/CD 和 Terraform 创建 AWS Lambda 函数,该函数允许 Go 应用程序连接到 SFTP 服务器、收集文件、将其存储在 Amazon S3 中,并最终在 Databricks 上触发作业。这种端到端的流程对于依赖高效数据集成和自动化的系统至关重要。

阅读本文需要什么

  • 具有项目存储库的 GitLab 帐户。
  • 有权创建 Lambda、S3 和 IAM 资源的 AWS 账户。
  • 具有创建和运行作业权限的 Databricks 帐户。
  • Go、Terraform 和 GitLab CI/CD 的基础知识。

第 1 步:准备 Go 应用程序

首先创建一个 Go 应用程序,该应用程序将连接到 SFTP 服务器以收集文件。使用 github.com/pkg/sftp 等软件包建立 SFTP 连接,并使用 github.com/aws/aws-sdk-go 与 AWS S3 服务交互。

package main

import (
 "fmt"
 "log"
 "os"
 "path/filepath"

 "github.com/pkg/sftp"
 "golang.org/x/crypto/ssh"
 "github.com/aws/aws-sdk-go/aws"
 "github.com/aws/aws-sdk-go/aws/session"
 "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func main() {
 // Configuração do cliente SFTP
 user := "seu_usuario_sftp"
 pass := "sua_senha_sftp"
 host := "endereco_sftp:22"
 config := &ssh.ClientConfig{
  User: user,
  Auth: []ssh.AuthMethod{
   ssh.Password(pass),
  },
  HostKeyCallback: ssh.InsecureIgnoreHostKey(),
 }

 // Conectar ao servidor SFTP
 conn, err := ssh.Dial("tcp", host, config)
 if err != nil {
  log.Fatal(err)
 }
 client, err := sftp.NewClient(conn)
 if err != nil {
  log.Fatal(err)
 }
 defer client.Close()

 // Baixar arquivos do SFTP
 remoteFilePath := "/path/to/remote/file"
 localDir := "/path/to/local/dir"
 localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
 dstFile, err := os.Create(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer dstFile.Close()

 srcFile, err := client.Open(remoteFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer srcFile.Close()

 if _, err := srcFile.WriteTo(dstFile); err != nil {
  log.Fatal(err)
 }

 fmt.Println("Arquivo baixado com sucesso:", localFilePath)

 // Configuração do cliente S3
 sess := session.Must(session.NewSession(&aws.Config{
  Region: aws.String("us-west-2"),
 }))
 uploader := s3manager.NewUploader(sess)

 // Carregar arquivo para o S3
 file, err := os.Open(localFilePath)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 _, err = uploader.Upload(&s3manager.UploadInput{
  Bucket: aws.String("seu-bucket-s3"),
  Key:    aws.String(filepath.Base(localFilePath)),
  Body:   file,
 })
 if err != nil {
  log.Fatal("Falha ao carregar arquivo para o S3:", err)
 }

 fmt.Println("Arquivo carregado com sucesso no S3")
}

步骤 2:配置 Terraform

Terraform 将用于在 AWS 上配置 Lambda 函数和所需资源。使用创建 Lambda 函数、IAM 策略和 S3 存储桶所需的配置创建 main.tf 文件。

provider "aws" {
  region = "us-east-1"
}

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "A policy that allows a lambda function to access S3 and SFTP resources"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "s3:ListBucket",
          "s3:GetObject",
          "s3:PutObject",
        ],
        Effect = "Allow",
        Resource = [
          "arn:aws:s3:::seu-bucket-s3",
          "arn:aws:s3:::seu-bucket-s3/*",
        ],
      },
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sftp_lambda" {
  function_name = "sftp_lambda_function"

  s3_bucket = "seu-bucket-s3-com-codigo-lambda"
  s3_key    = "sftp-lambda.zip"

  handler = "main"
  runtime = "go1.x"

  role = aws_iam_role.lambda_execution_role.arn

  environment {
    variables = {
      SFTP_HOST     = "endereco_sftp",
      SFTP_USER     = "seu_usuario_sftp",
      SFTP_PASSWORD = "sua_senha_sftp",
      S3_BUCKET     = "seu-bucket-s3",
    }
  }
}

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "seu-bucket-s3"
  acl    = "private"
}

步骤 3:配置 GitLab CI/CD

在 GitLab 中,在 .gitlab-ci.yml 文件中定义 CI/CD 管道。该管道应包括测试 Go 应用程序、运行 Terraform 来配置基础设施的步骤,以及必要时的清理步骤。

stages:
  - test
  - build
  - deploy

variables:
  S3_BUCKET: "seu-bucket-s3"
  AWS_DEFAULT_REGION: "us-east-1"
  TF_VERSION: "1.0.0"

before_script:
  - 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
  - eval $(ssh-agent -s)
  - echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
  - mkdir -p ~/.ssh
  - chmod 700 ~/.ssh
  - ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts

test:
  stage: test
  image: golang:1.18
  script:
    - go test -v ./...

build:
  stage: build
  image: golang:1.18
  script:
    - go build -o myapp
    - zip -r sftp-lambda.zip myapp
  artifacts:
    paths:
      - sftp-lambda.zip
  only:
    - master

deploy:
  stage: deploy
  image: hashicorp/terraform:$TF_VERSION
  script:
    - terraform init
    - terraform apply -auto-approve
  only:
    - master
  environment:
    name: production

第 4 步:与 Databricks 集成

将文件上传到 S3 后,Lambda 函数必须触发 Databricks 中的作业。这可以使用 Databricks API 启动现有作业来完成。

package main

import (
 "bytes"
 "encoding/json"
 "fmt"
 "net/http"
)

// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
 JobID int `json:"job_id"`
}

// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
 url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
 requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
 req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
 if err != nil {
  return err
 }

 req.Header.Set("Content-Type", "application/json")
 req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

 client := &http.Client{}
 resp, err := client.Do(req)
 if err != nil {
  return err
 }
 defer resp.Body.Close()

 if resp.StatusCode != http.StatusOK {
  return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
 }

 return nil
}

func main() {
 // ... (código existente para conectar ao SFTP e carregar no S3)

 // Substitua pelos seus valores reais
 databricksInstance := "your-databricks-instance"
 databricksToken := "your-databricks-token"
 databricksJobID := 123 // ID do job que você deseja acionar

 // Acionar o job no Databricks após o upload para o S3
 err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
 if err != nil {
  log.Fatal("Erro ao acionar o job do Databricks:", err)
 }

 fmt.Println("Job do Databricks acionado com sucesso")
}

第 5 步:运行管道

将代码推送到 GitLab 存储库以便管道运行。验证所有步骤是否已成功完成,Lambda 函数是否可运行并与 S3 和 Databricks 正确交互。

一旦您拥有完整的代码并配置了 .gitlab-ci.yml 文件,您就可以按照以下步骤运行管道:

  • 将您的代码推送到 GitLab 存储库:
  git add .
  git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
  git push origin master
git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master
´´´

  • GitLab CI/CD 将检测新的提交并自动启动管道。
  • 通过访问存储库的 CI/CD 部分来跟踪 GitLab 中管道的执行情况。
  • 如果所有阶段都成功,您的 Lambda 函数将被部署并可供使用。

请记住,您需要在 GitLab CI/CD 中配置环境变量来存储敏感信息,例如访问令牌和私钥。这可以在 GitLab 项目的“设置”>“CI/CD”>“变量”部分中完成。

此外,请确保 Databricks 令牌具有触发作业所需的权限,并且该作业具有提供的 ID。

结论

使用 GitLab CI/CD、Terraform 和 AWS Lambda 等工具可以显着简化数据工程任务的自动化。通过遵循本文中概述的步骤,您可以创建一个强大的系统,自动执行 SFTP、S3 和 Databricks 之间的数据收集和集成,所有这些都具有 Go 的效率和简单性。通过这种方法,您将有能力解决以下问题。大规模数据集成的挑战。

我的联系人:

LinkedIn - Airton Lira Junior

iMasters - Airton Lira Junior

aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation


版本聲明 本文轉載於:https://dev.to/airton_lirajunior_2ddebd/implementando-uma-lambda-com-gitlab-cicd-e-terraform-para-integracao-sftp-s3-e-databricks-em-go-5hc0?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 同實例無需轉儲複製MySQL數據庫方法
    同實例無需轉儲複製MySQL數據庫方法
    在同一實例上複製一個MySQL數據庫而無需轉儲在同一mySQL實例上複製數據庫,而無需創建InterMediate sqql script。以下方法為傳統的轉儲和IMPORT過程提供了更簡單的替代方法。 直接管道數據 MySQL手動概述了一種允許將mysqldump直接輸出到MySQL cli...
    程式設計 發佈於2025-06-22
  • 如何高效地在一個事務中插入數據到多個MySQL表?
    如何高效地在一個事務中插入數據到多個MySQL表?
    mySQL插入到多個表中,該數據可能會產生意外的結果。雖然似乎有多個查詢可以解決問題,但將從用戶表的自動信息ID與配置文件表的手動用戶ID相關聯提出了挑戰。 使用Transactions和last_insert_id() 插入用戶(用戶名,密碼)值('test','tes...
    程式設計 發佈於2025-06-22
  • 如何在Java字符串中有效替換多個子字符串?
    如何在Java字符串中有效替換多個子字符串?
    在java 中有效地替換多個substring,需要在需要替換一個字符串中的多個substring的情況下,很容易求助於重複應用字符串的刺激力量。 However, this can be inefficient for large strings or when working with nu...
    程式設計 發佈於2025-06-22
  • Go語言如何動態發現導出包類型?
    Go語言如何動態發現導出包類型?
    與反射軟件包中的有限類型的發現能力相反,本文探討了在運行時發現所有包裝類型(尤其是struntime go import( “ FMT” “去/進口商” ) func main(){ pkg,err:= incorter.default()。導入(“ time”) ...
    程式設計 發佈於2025-06-22
  • 編譯器報錯“usr/bin/ld: cannot find -l”解決方法
    編譯器報錯“usr/bin/ld: cannot find -l”解決方法
    錯誤:“ usr/bin/ld:找不到-l “ 此錯誤表明鏈接器在鏈接您的可執行文件時無法找到指定的庫。為了解決此問題,我們將深入研究如何指定庫路徑並將鏈接引導到正確位置的詳細信息。 添加庫搜索路徑的一個可能的原因是,此錯誤是您的makefile中缺少庫搜索路徑。要解決它,您可以在鏈接器命令中添...
    程式設計 發佈於2025-06-22
  • 為什麼不````''{margin:0; }`始終刪除CSS中的最高邊距?
    為什麼不````''{margin:0; }`始終刪除CSS中的最高邊距?
    在CSS 問題:不正確的代碼: 全球範圍將所有餘量重置為零,如提供的代碼所建議的,可能會導致意外的副作用。解決特定的保證金問題是更建議的。 例如,在提供的示例中,將以下代碼添加到CSS中,將解決餘量問題: body H1 { 保證金頂:-40px; } 此方法更精確,避免了由全局保證金重置...
    程式設計 發佈於2025-06-22
  • Python元類工作原理及類創建與定制
    Python元類工作原理及類創建與定制
    python中的metaclasses是什麼? Metaclasses負責在Python中創建類對象。就像類創建實例一樣,元類也創建類。他們提供了對類創建過程的控制層,允許自定義類行為和屬性。 在Python中理解類作為對象的概念,類是描述用於創建新實例或對象的藍圖的對象。這意味著類本身是使用...
    程式設計 發佈於2025-06-22
  • 可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    [2这里: https://webthemez.com/demo/sticky-multi-header-scroll/index.html </main> <section> { display:grid; grid-template-...
    程式設計 發佈於2025-06-22
  • 如何正確使用與PDO參數的查詢一樣?
    如何正確使用與PDO參數的查詢一樣?
    在pdo 中使用類似QUERIES在PDO中的Queries時,您可能會遇到類似疑問中描述的問題:此查詢也可能不會返回結果,即使$ var1和$ var2包含有效的搜索詞。錯誤在於不正確包含%符號。 通過將變量包含在$ params數組中的%符號中,您確保將%字符正確替換到查詢中。沒有此修改,PD...
    程式設計 發佈於2025-06-22
  • 如何使用PHP將斑點(圖像)正確插入MySQL?
    如何使用PHP將斑點(圖像)正確插入MySQL?
    essue VALUES('$this->image_id','file_get_contents($tmp_image)')";This code builds a string in PHP, but the function call fil...
    程式設計 發佈於2025-06-22
  • 如何避免Go語言切片時的內存洩漏?
    如何避免Go語言切片時的內存洩漏?
    ,a [j:] ...雖然通常有效,但如果使用指針,可能會導致內存洩漏。這是因為原始的備份陣列保持完整,這意味著新切片外部指針引用的任何對象仍然可能佔據內存。 copy(a [i:] 對於k,n:= len(a)-j i,len(a); k
    程式設計 發佈於2025-06-22
  • MySQL中如何高效地根據兩個條件INSERT或UPDATE行?
    MySQL中如何高效地根據兩個條件INSERT或UPDATE行?
    在兩個條件下插入或更新或更新 solution:的答案在於mysql的插入中...在重複鍵更新語法上。如果不存在匹配行或更新現有行,則此功能強大的功能可以通過插入新行來進行有效的數據操作。如果違反了唯一的密鑰約束。 實現所需的行為,該表必須具有唯一的鍵定義(在這種情況下為'名稱'...
    程式設計 發佈於2025-06-22
  • 如何使用“ JSON”軟件包解析JSON陣列?
    如何使用“ JSON”軟件包解析JSON陣列?
    parsing JSON與JSON軟件包 QUALDALS:考慮以下go代碼:字符串 } func main(){ datajson:=`[“ 1”,“ 2”,“ 3”]`` arr:= jsontype {} 摘要:= = json.unmarshal([] byte(...
    程式設計 發佈於2025-06-22
  • 為什麼使用Firefox後退按鈕時JavaScript執行停止?
    為什麼使用Firefox後退按鈕時JavaScript執行停止?
    導航歷史記錄問題:JavaScript使用Firefox Back Back 此行為是由瀏覽器緩存JavaScript資源引起的。要解決此問題並確保在後續頁面訪問中執行腳本,Firefox用戶應設置一個空功能。 警報'); }; alert('inline Alert')...
    程式設計 發佈於2025-06-22
  • 如何在Java中正確顯示“ DD/MM/YYYY HH:MM:SS.SS”格式的當前日期和時間?
    如何在Java中正確顯示“ DD/MM/YYYY HH:MM:SS.SS”格式的當前日期和時間?
    如何在“ dd/mm/yyyy hh:mm:mm:ss.ss”格式“ gormat 解決方案:的,請訪問量很大,並應為procectiquiestate的,並在整個代碼上正確格式不多: java.text.simpledateformat; 導入java.util.calendar; 導入java...
    程式設計 發佈於2025-06-22

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3