연동해야 하는 API의 특성을 분석하고, 유연성과 확장성을 고려하여 배치 프로그램을 구현해보았습니다.
1. API 연동 방식
- HTTP 요청 메소드는 Get 방식입니다.
- 요청 URL – API 종류마다 동일한 Url 엔드포인트에 Uri와 파라메터들에 대한 요청 값만 변경설정하여 요청하는 방식으로 일관성있는 포맷으로 정의됩니다.
- 응답 메시지 포멧은 V1.3 API와 Bulk API 두 방식으로 지원됩니다.
- V1.3 API는 JSON 형식입니다.
- Bulk API는 다운가능한 Bulk file URL 리스트가 Json 형식으로 수신됩니다.
- Bulk API는 수신된 Bulk file URL을 개별적으로 HTTP Get 방식으로 호출하여 .csv.gz 파일을 다운로드 받는 형태입니다.
2. python 스크립트 구성
- API 연동방식에 대한 이해를 토대로 python 배치 스크립트를 V1.3 API 용, Bulk API용 두가지로 개발하였습니다.
- 최종 수신된 데이터는 S3에 boto3 파이썬 라이브러리로 적재합니다.
def data_api_v_1_3_call(bucket, apiurl, apikey, group, filename):
try:
start = time.time()
payload = {}
headers = {
'Authorization': 'Bearer {}'.format(apikey)
}
response = requests.request("GET", apiurl, headers=headers, data=payload)
print(response.text)
json_data = json.loads(response.text)
result = upload_file_s3(bucket, '{}/{}.json'.format(group, filename), json_data)
statusCode = result['ResponseMetadata']['HTTPStatusCode']
print("elapsed time : ", time.time() - start, "sec", " statusCode : ", statusCode)
if statusCode == 200:
return {
'statusCode': statusCode,
'body': json.dumps('success')
}
else:
return {
'statusCode': statusCode,
'body': json.dumps('fail {}'.format(result))
}
except Exception as e:
print(e)
return {
'statusCode': 400,
'body': json.dumps('fail {}'.format(e))
}
def upload_file_s3(bucket, file_name, file):
encode_file = bytes(json.dumps(file).encode('UTF-8'))
s3 = boto3.client('s3')
result = s3.put_object(Bucket=bucket, Key=file_name, Body=encode_file)
print(result)
return result
def bulk_api_call(bucket, apiurl, apikey, download_path, download_url_name):
try:
start = time.time()
s3 = boto3.client('s3')
payload = {}
headers = {
'Authorization': 'Bearer {}'.format(apikey),
'Accept': 'application/json'
}
response = requests.request("GET", apiurl, headers=headers, data=payload)
print(response.status_code, response.text)
json_data = json.loads(response.text)
for i in range(len(json_data['data']['urls'])):
download_start = time.time()
download_url = json_data['data']['urls'][i]
print("url >>>> {}".format(download_url))
if "dimensions" == download_url_name:
if "name=metrics" in download_url:
continue
if "metrics" == download_url_name:
if "name=dimensions" in download_url:
continue
upload_file_path, file_name = get_object_path(download_url)
payload = {}
headers = {
'Authorization': 'Bearer {}'.format(apikey),
'Authentication': '{}'.format(apikey)
}
response = requests.request("GET", download_url, headers=headers, data=payload)
print(response.status_code)
save_directory = "{}/{}".format(download_path, upload_file_path)
if not os.path.exists(save_directory):
os.makedirs(save_directory, True)
f = open("{}/{}.csv.gz".format(save_directory, file_name), 'wb')
for chunk in response.iter_content(chunk_size=512 * 1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.close()
s3.upload_file("{}/{}.csv.gz".format(save_directory, file_name), bucket,
"{}/{}.csv.gz".format(upload_file_path, file_name))
os.remove("{}/{}.csv.gz".format(save_directory, file_name))
print("{}. download elapsed time : {} sec".format(i, time.time() - download_start))
print("total elapsed time : ", time.time() - start, "sec")
return {
'statusCode': 200,
'body': json.dumps('success')
}
except Exception as e:
print(e)
return {
'statusCode': 400,
'body': json.dumps('fail {}'.format(e))
}
- 제공되는 수십가지의 API 종류들에 대해서는 쉘 스크립트에서 파라메터로 위에 작성한 두가지 python 배치 스크립트에 전달하여 호출하는 방식으로 구성하실 수 있습니다.
3. S3 적재
1. S3콘솔에서 버킷만들기 버튼을 클릭합니다.
2. 버킷이름을 입력하고 리전을 선택합니다.
3. 태그를 구성하고 버킷을 생성합니다.
4. API 1.3으로 부터 수신된 로우 데이터를 저장하는 버킷과, Bulk Data API로 부터 수신된 .csv.gz 파일을 저장하는 버킷 두개를 생성해 둡니다.
* 파이썬 배치프로그램에서 로직을 간소화하여 유연성있게 개발한 만큼 S3 버킷의 적재 경로 또한 동적으로 호출되는 API 특성에 따라 자동 생성되도록 설계하여 프로그밍했습니다. 아래와 같은 형식으로 적재될 것입니다.
<API V1.3인 경우 S3 버킷 적재 구조>
<BULK API인 경우 S3 버킷 적재 구조>
4. 로컬환경 테스트
1.[Hands On] CodePipeline으로 CI/CD 구축에서 언급한 CodeCommit 리포지토리 git clone 명령을 통해 로컬환경에 다운로드 해 둡니다.
2. 저는 파이썬 IDE로 pycharm community edition에서 작업하였습니다.
A. AWS 자격증명 환경설정
로컬환경에서 AWS SDK를 통해 작업하기 위해 자격증명설정을 합니다.
1. IAM 사용자 메뉴에서 작업할 사용자를 선택합니다. 사용자가 admin 그룹권한에 연결되어있는 관계로,
[Hands On] CodePipeline으로 CI/CD 구축 에서 부여한 AWSCodeCommitPowerUser 이외의 권한을 별도로 부여해주진 않았습니다.
2. 상세화면에서 보안자격증명 탭의 엑세스키 만들기를 클릭합니다.
3. 자격증명이 생성되고 .csv 파일 다운로드가 활성화되어집니다.
4. .csv 파일을 다운로드받아, 로컬에 보관해둡니다. 초기 생성시에만 다운로드가 가능합니다.
5. 로컬환경의 터미널에서 aws configure 명령으로 다운로드 받은 자격증명 정보를 참고로 설정해둡니다.
B. 로컬환경에서 배치 실행
1. 쉘 스트립트에서 참조되는 환경변수값(경로 정보, S3버킷정보 등)을 로컬 테스트 환경에 맞게 설정합니다.
2. 윈도우에서는 powershell, 명령프롬프트 등을 이용해서 linux bash shell명령이 정상적으로 실행되지 않습니다. cygwin 등의 터미널 프로그램을 설치하면 가능합니다.
3. 실행 소스 경로로 이동합니다. cd [소스경로]
4. 실행할 쉘 스크립트에 실행권한을 부여합니다. chmod 755 [쉘 스크립트 파일 경로]
5. 쉘스크립트를 실행합니다. ./쉘스크립트
6. 터미널에서 실행메시지가 출력되어지는 것을 확인가능합니다. API 요청으로 S3까지 .csv.gz 파일 업로드까지 진행되어집니다. S3 버킷경로에서 위에 언급한 구조로 로우 데이터가 적재되어져 있음을 확인가능합니다.
5. EC2 테스트
- 최초, EC2 인스턴스내에 [Hands On] CodePipeline으로 CI/CD 구축을 진행한 후 위에 로컬 테스트 진행한것처럼 배치스크립트가 정상실행되는지 확인해봅니다.
- 코드 변경후 로컬테스트 혹은 테스트환경의 별도 EC2에서 실제 운영되지 않는 S3버킷을 생성하여 테스트 해 본 후 정상동작이 확인되면 codecommit에 push하는 것을 권장드립니다.
- 개발, 운영 프로파일 따라서 동작되도록 배치 프로그램을 리팩토링 하는 과정 또한 권장드립니다.
A. AWS 자격증명 환경설정
저는 POC과정에서 로컬 테스트 환경과 동일하게 EC2 인스턴스 환경내에 AWS 자격증명을 환경설정에 추가해주었습니다.
IAM 인스턴스에 IAM Role을 설정하여 리소스들에 접근 권한을 부여하는 방법 또한 제공되니, 이 방법으로 변경하시는것을 고려해보시기 바랍니다.
1. root 권한으로 변경합니다. sudo su-
2. aws configure명령으로 생성해 놓은 보안 자격증명으로 EC2내에서 환경정보로 설정해둡니다.
3. root 최상위 경로에 .aws 파일이 생성된것을 확인가능합니다.
B. EC2에서 배치 실행
1. CodeDeploy로 배포가 이루어지면서 실행 쉘에 실행권한이 이미 부여된 상태입니다.
2. 쉘 스크립트가 있는 경로로 이동해서 쉘을 실행해봅니다.
6. 배치 스케줄링 설정
1. EC2내에 배치스크립트가 있는 경로로 이동합니다. cd /home/ec2-user/dplanex_data_ai
2. 배치 수행여부를 확인 할 수 있는 로그를 남기기 위한 디렉토리를 생성합니다. mkdir logs
3. 배치 스케줄링을 EC2 linux 환경의 crontab에 설정하였습니다.
4. root 계정으로 crontab -e 명령을 입력합니다.
5. 편집가능한 상태에서 스케줄 정보를 입력합니다. date명령으로 확인시 ec2 환경의 타임존이 UTC인 관계로 아래와 같이 설정할 경우 KST로 9시 5분에 Bulk Api 스크립트가 동작하게 됩니다.
5 0 * * * /home/ec2-user/dplanex_data_ai/scripts/api_bulk_urls/bulk_api.sh >> /home/ec2-user/dplanex_data_ai/logs/bulk_api_`date “+\%Y-\%m-\%d”`.log 2>&1
6. crontab -l로 설정한 스케줄링 정보를 확인합니다.