Initial commit - Organizing all Python vibes

This commit is contained in:
Andrew 2025-12-29 22:15:52 +10:00
commit 9b2911fae4
6 changed files with 1046 additions and 0 deletions

17
.gitignore vendored Normal file
View file

@ -0,0 +1,17 @@
# Python bytecode
__pycache__/
*.py[cod]
*$py.class
# Virtual Environments
venv/
.venv/
env/
# Sensitive data (API keys, etc.)
.env
secrets.json
# Data files
*.csv
*.xlsx

View file

@ -0,0 +1,111 @@
import requests
import pandas as pd
import csv
# --- CONFIGURATION ---
# IMPORTANT: Replace this with the full URL for your Elvanto report,
# including the authkey.
REPORT_URL = "https://cairnspc.elvanto.com.au/report/?id=c5ab005a-f2be-403e-84a5-85870ac4a41b&authkey=lguYFfS9"
# The name of the file where the data will be saved.
OUTPUT_FILE = "scraped_elvanto_report.csv"
# This mapping helps rename columns from the Elvanto HTML report
# to the desired CSV column names. This is the MOST LIKELY part you'll need to edit.
#
# - Keys: The column names as they appear EXACTLY in the Elvanto report table.
# - Values: The new column names you want in your final CSV.
#
# Open the report link in your browser to see the exact column headers and update the keys below.
# Note: Column names might be slightly different between yearly tables, so check them all.
COLUMN_MAPPING = {
'Date': 'Date',
'Service': 'Service',
'Total Individuals': 'Total Individuals', # This might be 'Total Individual' or 'Sum' in your report
'Adults': 'Adults',
'Children': 'Children',
'Guests': 'Guests', # Elvanto reports often use 'Visitors' for guests.
}
# --- END OF CONFIGURATION ---
def main():
"""
Main function to fetch an Elvanto report page, scrape all data tables,
and write the combined results to a CSV file.
"""
if "YOUR_REPORT_URL_HERE" in REPORT_URL:
print("Error: Please update the 'REPORT_URL' in the script before running.")
return
print(f"Attempting to fetch data from your Elvanto report URL...")
try:
# Use headers to mimic a web browser, which can help with access.
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(REPORT_URL, headers=headers)
response.raise_for_status()
print("Successfully fetched the report page.")
# pandas.read_html() intelligently searches for <table> tags on the page
# and returns a list of DataFrames, one for each table found.
tables = pd.read_html(response.text)
if not tables:
print("\nError: No data tables were found on the report page.")
print("Please check that the URL is correct and the report is loading properly in your browser.")
return
print(f"Found {len(tables)} data table(s) on the page. Combining them...")
all_dataframes = []
for i, table_df in enumerate(tables):
print(f"--- Processing table {i+1} of {len(tables)} ---")
processed_df = table_df.copy()
processed_df.rename(columns=COLUMN_MAPPING, inplace=True)
required_columns = list(COLUMN_MAPPING.values())
final_columns = [col for col in required_columns if col in processed_df.columns]
if not final_columns:
print(f"Warning: No mappable columns found in table {i+1}. Skipping this table.")
continue
all_dataframes.append(processed_df[final_columns])
if not all_dataframes:
print("\nError: After processing all tables, no valid data could be extracted.")
print("Please double-check your 'COLUMN_MAPPING' configuration.")
return
# Concatenate all the processed dataframes into one
df_final = pd.concat(all_dataframes, ignore_index=True)
print("\nSuccessfully combined all tables.")
# Data Cleaning: Remove summary rows like 'Total' or 'Average'
df_final = df_final[~df_final.iloc[:, 0].astype(str).str.contains('Total|Average', na=False)]
# Convert date column to the correct format if it exists
if 'Date' in df_final.columns:
try:
df_final['Date'] = pd.to_datetime(df_final['Date']).dt.strftime('%Y-%m-%d %H:%M:%S')
df_final.sort_values(by='Date', ascending=False, inplace=True)
except Exception as e:
print(f"\nWarning: Could not automatically format the 'Date' column. It may contain unexpected values. Error: {e}")
df_final.to_csv(OUTPUT_FILE, index=False, quoting=csv.QUOTE_ALL)
print(f"\nSuccess! Scraped {len(df_final)} records and saved them to '{OUTPUT_FILE}'.")
print(f"Final columns in CSV: {df_final.columns.tolist()}")
except requests.exceptions.RequestException as e:
print(f"\nError: Failed to fetch the URL. Please check the REPORT_URL and your internet connection. Details: {e}")
except Exception as e:
print(f"\nAn unexpected error occurred during processing: {e}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,367 @@
import pandas as pd
import xlsxwriter
# --- 1. SETUP THE DATA (Effective June 2025) ---
ANNUAL_FACTOR = 26
# Stipend Rates (Base Annual Amounts)
stipend_data = {
'Key': [
'Urban_Ordained Minister', 'Provincial_Ordained Minister', 'Rural_Ordained Minister', 'Remote_Ordained Minister',
'Urban_Home Missionary', 'Provincial_Home Missionary', 'Rural_Home Missionary', 'Remote_Home Missionary',
'Urban_Specialised Ministry Worker', 'Provincial_Specialised Ministry Worker', 'Rural_Specialised Ministry Worker', 'Remote_Specialised Ministry Worker'
],
# Annual Figures (Fortnightly x 26)
'Cash': [41782, 42627, 43459, 45136, 37193, 37934, 38675, 40170, 31343, 31967, 32591, 33852],
'EPFB': [41782, 42627, 43459, 45136, 37193, 37934, 38675, 40170, 31343, 31967, 32591, 33852],
'MEA': [12974, 14040, 18460, 19864, 12974, 14040, 18460, 19864, 12974, 14040, 18460, 19864]
}
# Manse Deduction (Annual)
manse_data = {
'Zone': ['Urban', 'Provincial', 'Rural', 'Remote'],
'Deduction': [26104, 26104, 24388, 24388]
}
df_rates = pd.DataFrame(stipend_data)
df_manse = pd.DataFrame(manse_data)
# --- 2. CREATE THE EXCEL FILE ---
filename = 'PCQ_Form_B_Final_Fixed.xlsx'
writer = pd.ExcelWriter(filename, engine='xlsxwriter')
workbook = writer.book
# --- 3. FORMATS ---
fmt_header = workbook.add_format({'bold': True, 'bg_color': '#4F81BD', 'font_color': 'white', 'border': 1})
fmt_subhead = workbook.add_format({'bold': True, 'bg_color': '#DCE6F1', 'border': 1})
fmt_subhead_c = workbook.add_format({'bold': True, 'bg_color': '#DCE6F1', 'border': 1, 'align': 'center'})
fmt_input = workbook.add_format({'bg_color': '#FFF2CC', 'border': 1})
fmt_locked = workbook.add_format({'bg_color': '#E7E6E6', 'border': 1})
fmt_currency_locked = workbook.add_format({'num_format': '$#,##0', 'bg_color': '#E7E6E6', 'border': 1})
fmt_currency_input = workbook.add_format({'num_format': '$#,##0', 'bg_color': '#FFF2CC', 'border': 1})
fmt_percent = workbook.add_format({'num_format': '0%', 'bg_color': '#FFF2CC', 'border': 1})
fmt_int_input = workbook.add_format({'num_format': '0', 'bg_color': '#FFF2CC', 'border': 1})
fmt_total = workbook.add_format({'num_format': '$#,##0', 'bold': True, 'bg_color': '#D9D9D9', 'border': 1})
fmt_result = workbook.add_format({'num_format': '$#,##0;[Red]($#,##0)', 'bold': True, 'font_size': 12, 'border': 1})
fmt_title = workbook.add_format({'bold': True, 'font_size': 14})
fmt_deduction = workbook.add_format({'num_format': '-$#,##0', 'bg_color': '#E7E6E6', 'font_color': '#9C0006', 'border': 1})
# --- 4. SHEET: "Hidden_Data" ---
df_rates.to_excel(writer, sheet_name='Hidden_Data', index=False)
df_manse.to_excel(writer, sheet_name='Hidden_Data', startcol=6, index=False)
writer.sheets['Hidden_Data'].hide()
# --- 5. SHEET: "Form B Calculator" ---
ws = workbook.add_worksheet('Form B Calculator')
ws.set_column('A:A', 5)
ws.set_column('B:B', 45)
ws.set_column('C:C', 25)
ws.set_column('D:D', 40)
# Title
ws.write('B2', 'MINISTRY SUPPORT FUND SCHEDULE (FORM B) - 2025 CALCULATOR', fmt_title)
# === SECTION 1: APPOINTMENT DETAILS ===
ws.write('B4', '1. APPOINTMENT DETAILS', fmt_header)
ws.write('C4', 'SELECTION', fmt_header)
ws.write('D4', 'NOTES', fmt_header)
r1 = 4
# Lists
list_zones = {'validate': 'list', 'source': ['Urban', 'Provincial', 'Rural', 'Remote']}
list_roles = {'validate': 'list', 'source': ['Ordained Minister', 'Home Missionary', 'Specialised Ministry Worker']}
list_manse_type = {'validate': 'list', 'source': ['No Manse', 'Church Owned Manse', 'Church Rented Manse']}
list_yesno = {'validate': 'list', 'source': ['Yes', 'No']}
list_energy = {'validate': 'list', 'source': ['Appointee Pays', 'Church Pays 100%', 'Fixed Amount (Paid by Church)']}
# -- Basic Details --
ws.write(r1+1, 1, 'Charge Name', fmt_locked)
ws.write(r1+1, 2, '', fmt_input)
ws.write(r1+2, 1, 'Zone', fmt_locked)
ws.data_validation(r1+2, 2, r1+2, 2, list_zones)
ws.write(r1+2, 2, 'Provincial', fmt_input)
ws.write(r1+3, 1, 'Position', fmt_locked)
ws.data_validation(r1+3, 2, r1+3, 2, list_roles)
ws.write(r1+3, 2, 'Ordained Minister', fmt_input)
ws.write(r1+4, 1, 'Work Load %', fmt_locked)
ws.write(r1+4, 2, 1.0, fmt_percent)
ws.write(r1+4, 3, '100% = Full Time, 50% = Half Time')
# -- Loading --
ws.write(r1+5, 1, 'Stipend Loading % (Optional)', fmt_locked)
ws.write(r1+5, 2, 0.0, fmt_percent)
ws.write(r1+5, 3, '% Paid above minimum')
# -- Manse Details --
ws.write(r1+6, 1, 'Manse Arrangement', fmt_locked)
ws.data_validation(r1+6, 2, r1+6, 2, list_manse_type)
ws.write(r1+6, 2, 'No Manse', fmt_input)
# Guideline 210
ws.write(r1+7, 1, 'Does Manse meet Guideline 210?', fmt_locked)
ws.data_validation(r1+7, 2, r1+7, 2, list_yesno)
ws.write(r1+7, 2, 'Yes', fmt_input)
ws.write(r1+8, 1, 'If No, specify details:', fmt_locked)
ws.write(r1+8, 2, '', fmt_input)
# Energy
ws.write(r1+9, 1, 'Manse Energy Arrangement', fmt_locked)
ws.data_validation(r1+9, 2, r1+9, 2, list_energy)
ws.write(r1+9, 2, 'Appointee Pays', fmt_input)
ws.write(r1+10, 1, 'Est. Energy Cost (If Church Pays)', fmt_locked)
ws.write(r1+10, 2, 0, fmt_currency_input)
# Rental Cost
ws.write(r1+11, 1, 'Actual Rent Paid to Landlord', fmt_locked)
ws.write(r1+11, 2, 0, fmt_currency_input)
ws.write(r1+11, 3, 'Only if "Church Rented Manse" is selected')
# === SECTION 2: CALCULATED TERMS ===
r2 = r1 + 11 + 2
ws.write(r2, 1, '2. TERMS OF APPOINTMENT (Calculated)', fmt_header)
ws.write(r2, 2, 'AMOUNT (ANNUAL)', fmt_header)
r2_data = r2 + 1 # Start index for data rows
ws.write_formula('Z6', '=C7&"_"&C8') # Key
# --- RELATIVE ROW MAPPING (0-based offset from r2_data) ---
# 0: Cash Stipend
# 1: Header (EPFB Calc)
# 2: Gross EPFB
# 3: Manse Deduction
# 4: Net EPFB
# 5: Header (Allowances)
# 6: MEA
# 7: Super
# 8: Rent
# 9: Energy
# 10: Total
# --- BASE FORMULAS ---
f_load_factor = '(C9 * (1 + C10))'
f_cash_calc = f'=IFERROR(VLOOKUP(Z6, Hidden_Data!A:E, 2, FALSE) * {f_load_factor}, 0)'
f_epfb_calc = f'IFERROR(VLOOKUP(Z6, Hidden_Data!A:E, 3, FALSE) * {f_load_factor}, 0)'
f_manse_ded_calc = 'IF(C11<>"No Manse", IFERROR(VLOOKUP(C7, Hidden_Data!G:H, 2, FALSE), 0), 0)'
f_mea_calc = '=IFERROR(VLOOKUP(Z6, Hidden_Data!A:E, 4, FALSE) * C9, 0)'
# --- WRITE ROWS ---
# Row 0: Cash Stipend
ws.write(r2_data, 1, 'Cash Stipend', fmt_locked)
ws.write_formula(r2_data, 2, f_cash_calc, fmt_currency_locked)
# Row 1: Header
ws.write(r2_data+1, 1, '--- EPFB CALCULATION ---', fmt_subhead)
ws.write(r2_data+1, 2, '', fmt_subhead)
# Row 2: Gross EPFB
ws.write(r2_data+2, 1, 'Gross EPFB Entitlement', fmt_locked)
ws.write_formula(r2_data+2, 2, f_epfb_calc, fmt_currency_locked)
# Row 3: Manse Deduction
ws.write(r2_data+3, 1, 'Less: Manse Deduction', fmt_locked)
ws.write_formula(r2_data+3, 2, f_manse_ded_calc, fmt_deduction)
ws.write(r2_data+3, 3, '($26,104 Urban/Prov | $24,388 Rural)', fmt_locked)
# Row 4: Net EPFB (Row 2 minus Row 3) -> Excel Row indices are +1
# Formula uses Excel Index: C{r2_data + 3} - C{r2_data + 4} (Wait, Deduction is negative? No, formula subtraction)
# Let's check logic: Gross (Pos) - Ded (Pos).
# Cell C{r2_data+3} is Gross. Cell C{r2_data+4} is Deduction.
ws.write(r2_data+4, 1, 'Net EPFB Payable', workbook.add_format({'bold': True, 'bg_color': '#D9D9D9', 'border': 1}))
ws.write_formula(r2_data+4, 2, f'=MAX(0, C{r2_data+3} - C{r2_data+4})', fmt_currency_locked)
# Row 5: Header
ws.write(r2_data+5, 1, '--- ALLOWANCES & SUPER ---', fmt_subhead)
ws.write(r2_data+5, 2, '', fmt_subhead)
# Row 6: MEA
ws.write(r2_data+6, 1, 'Ministry Expense Allowance (MEA)', fmt_locked)
ws.write_formula(r2_data+6, 2, f_mea_calc, fmt_currency_locked)
# Row 7: Superannuation
# Logic: 15% * (Cash + EPFB)
# Cash is at Row 0 -> C{r2_data+1}
# EPFB is at Row 4 -> C{r2_data+2}
ws.write(r2_data+7, 1, 'Superannuation (15%)', fmt_locked)
ws.write_formula(r2_data+7, 2, f'=0.15 * (C{r2_data+1} + C{r2_data+3})', fmt_currency_locked)
# Row 8: Rent
ws.write(r2_data+8, 1, 'Manse Rent (Paid to Landlord)', fmt_locked)
ws.write_formula(r2_data+8, 2, '=IF(C11="Church Rented Manse", C16, 0)', fmt_currency_locked)
# Row 9: Energy
ws.write(r2_data+9, 1, 'Manse Energy (Paid by Church)', fmt_locked)
ws.write_formula(r2_data+9, 2, '=IF(C14="Appointee Pays", 0, C15)', fmt_currency_locked)
# Row 10: TOTAL
ws.write(r2_data+10, 1, 'TOTAL COST OF NEW POSITION', workbook.add_format({'bold': True, 'bg_color': '#D9D9D9', 'border': 1}))
# Sum specific cells: Cash(0), NetEPFB(4), MEA(6), Super(7), Rent(8), Energy(9)
# Add 1 to all indices for Excel format
ws.write_formula(r2_data+10, 2, f'=C{r2_data+1} + C{r2_data+5} + C{r2_data+7} + C{r2_data+8} + C{r2_data+9} + C{r2_data+10}', fmt_total)
row_total_cost = r2_data + 10 # Reference for later sections
# === SECTION 3: CHURCH STATISTICS ===
r3 = r2_data + 10 + 2
ws.write(r3, 1, '3. CHURCH STATISTICS (Form B - Sec A)', fmt_header)
ws.write(r3, 2, 'DATA', fmt_header)
r3_data = r3 + 1
ws.write(r3_data, 1, 'Preaching Place 1', fmt_locked)
ws.write(r3_data, 2, '', fmt_input)
ws.write(r3_data+1, 1, 'Preaching Place 2', fmt_locked)
ws.write(r3_data+1, 2, '', fmt_input)
ws.write(r3_data+2, 1, 'Preaching Place 3', fmt_locked)
ws.write(r3_data+2, 2, '', fmt_input)
ws.write(r3_data+3, 1, 'Avg Attendance (Last 6 Months)', fmt_locked)
ws.write(r3_data+3, 2, 0, fmt_int_input)
ws.write(r3_data+4, 1, 'Avg Giving Per Month (Last 6 Months)', fmt_locked)
ws.write(r3_data+4, 2, 0, fmt_currency_input)
# === SECTION 4: FINANCIAL POSITION ===
r4 = r3_data + 4 + 2
ws.write(r4, 1, '4. FINANCIAL POSITION (Form B - Sec B & C)', fmt_header)
ws.write(r4, 2, 'AMOUNT', fmt_header)
ws.write(r4, 3, 'INTEREST (Est)', fmt_header)
ws.write(r4, 4, 'TO REVENUE', fmt_header)
r4_curr = r4 + 1
ws.write(r4_curr, 1, 'ASSETS: Bank Balances & Cash', fmt_subhead)
ws.write(r4_curr, 2, '', fmt_subhead)
ws.write(r4_curr, 3, '', fmt_subhead)
ws.write(r4_curr, 4, '', fmt_subhead)
r4_curr += 1
for i in range(4):
ws.write(r4_curr, 1, f'Account {i+1} Name:', fmt_locked)
ws.write(r4_curr, 2, 0, fmt_currency_input)
r4_curr += 1
ws.write(r4_curr, 1, 'ASSETS: Investments / Trusts', fmt_subhead)
ws.write(r4_curr, 2, 'Capital Value', fmt_subhead_c)
ws.write(r4_curr, 3, 'Interest', fmt_subhead_c)
ws.write(r4_curr, 4, 'Used in Rev.', fmt_subhead_c)
r4_curr += 1
start_asset_row = r4_curr
for i in range(3):
ws.write(r4_curr, 1, f'Asset/Trust {i+1}:', fmt_locked)
ws.write(r4_curr, 2, 0, fmt_currency_input)
ws.write(r4_curr, 3, 0, fmt_currency_input)
ws.write(r4_curr, 4, 0, fmt_currency_input)
r4_curr += 1
end_asset_row = r4_curr - 1
ws.write(r4_curr, 1, 'TOTAL ASSETS', fmt_total)
ws.write_formula(r4_curr, 2, f'=SUM(C{r4+2}:C{r4_curr-1})', fmt_total)
r4_curr += 1
ws.write(r4_curr, 1, 'LIABILITIES (Debts & Arrears)', fmt_subhead)
ws.write(r4_curr, 2, '', fmt_subhead)
r4_curr += 1
start_liab = r4_curr
for item in ['Mortgage / Loan 1', 'Mortgage / Loan 2', 'Arrears: MSF', 'Arrears: Assessments', 'Other Debts']:
ws.write(r4_curr, 1, item, fmt_locked)
ws.write(r4_curr, 2, 0, fmt_currency_input)
r4_curr += 1
ws.write(r4_curr, 1, 'TOTAL LIABILITIES', fmt_total)
ws.write_formula(r4_curr, 2, f'=SUM(C{start_liab}:C{r4_curr-1})', fmt_total)
r4_last = r4_curr
# === SECTION 5: ESTIMATED REVENUE ===
r5 = r4_last + 2
ws.write(r5, 1, '5. ESTIMATED REVENUE (Form B - Sec E)', fmt_header)
ws.write(r5, 2, 'AMOUNT', fmt_header)
r5_data = r5 + 1
ws.write(r5_data, 1, 'Collections / Contributions', fmt_locked)
ws.write(r5_data, 2, 0, fmt_currency_input)
ws.write(r5_data+1, 1, 'Income from Assets or Trusts', fmt_locked)
ws.write_formula(r5_data+1, 2, f'=SUM(E{start_asset_row+1}:E{end_asset_row+1})', fmt_currency_locked)
ws.write(r5_data+2, 1, 'Other Revenues (Donations, Solar, etc.)', fmt_locked)
ws.write(r5_data+2, 2, 0, fmt_currency_input)
r5_total = r5_data + 3
ws.write(r5_total, 1, 'TOTAL ESTIMATED REVENUE', fmt_subhead)
ws.write_formula(r5_total, 2, f'=SUM(C{r5_data+1}:C{r5_total-1})', fmt_total)
# === SECTION 6: ESTIMATED EXPENDITURE ===
r6 = r5_total + 2
ws.write(r6, 1, '6. ESTIMATED EXPENDITURE (Form B - Sec F)', fmt_header)
ws.write(r6, 2, 'AMOUNT', fmt_header)
r6_curr = r6 + 1
ws.write(r6_curr, 1, 'Staffing Costs', fmt_subhead)
ws.write(r6_curr, 2, '', fmt_subhead)
r6_curr += 1
# Link to Section 2 Total:
# "Total Terms" = Total Cost - Super
# Total Cost is at Row 10 -> C{row_total_cost+1}
# Super is at Row 7 -> C{row_total_cost+1 - 3}
ws.write(r6_curr, 1, 'Total Terms of Appt (New Appointee)', fmt_locked)
ws.write_formula(r6_curr, 2, f'=C{row_total_cost+1} - C{row_total_cost+1-3}', fmt_currency_locked)
r6_curr += 1
ws.write(r6_curr, 1, 'Superannuation (New Appointee)', fmt_locked)
ws.write_formula(r6_curr, 2, f'=C{row_total_cost+1-3}', fmt_currency_locked)
r6_curr += 1
ws.write(r6_curr, 1, 'Other Ministry Workers', fmt_locked)
ws.write(r6_curr, 2, 0, fmt_currency_input)
r6_curr += 1
ws.write(r6_curr, 1, 'Non-religious workers', fmt_locked)
ws.write(r6_curr, 2, 0, fmt_currency_input)
r6_curr += 1
ws.write(r6_curr, 1, 'Assessments & Mission', fmt_subhead)
ws.write(r6_curr, 2, '', fmt_subhead)
r6_curr += 1
for item in ['MSF Assessment', 'Assembly Assessments', 'State Mission Program', 'Presbytery Levy', 'Missionaries Support']:
ws.write(r6_curr, 1, item, fmt_locked)
ws.write(r6_curr, 2, 0, fmt_currency_input)
r6_curr += 1
ws.write(r6_curr, 1, 'Property & Operations', fmt_subhead)
ws.write(r6_curr, 2, '', fmt_subhead)
r6_curr += 1
for item in ['Rates, Land Tax', 'Electricity / Gas / Maint', 'General Operating', 'Insurances']:
ws.write(r6_curr, 1, item, fmt_locked)
ws.write(r6_curr, 2, 0, fmt_currency_input)
r6_curr += 1
ws.write(r6_curr, 1, 'Other', fmt_subhead)
ws.write(r6_curr, 2, '', fmt_subhead)
r6_curr += 1
for item in ['Printing/Ed/Licences', 'Loan/Capital Repayment', 'Major Works', 'Other/Sundry']:
ws.write(r6_curr, 1, item, fmt_locked)
ws.write(r6_curr, 2, 0, fmt_currency_input)
r6_curr += 1
ws.write(r6_curr, 1, 'TOTAL ESTIMATED EXPENDITURE', fmt_subhead)
ws.write_formula(r6_curr, 2, f'=SUM(C{r6+2}:C{r6_curr-1})', fmt_total)
r6_total_row = r6_curr
# === SECTION 7: RESULT ===
r7 = r6_total_row + 2
ws.write(r7, 1, '7. ESTIMATED SURPLUS / (DEFICIT)', workbook.add_format({'bold': True, 'font_size': 12, 'border': 1}))
ws.write_formula(r7, 2, f'=C{r5_total+1}-C{r6_total_row+1}', fmt_result)
# --- PROTECT ---
ws.protect('password', {'select_locked_cells': True, 'select_unlocked_cells': True})
fmt_input.set_locked(False)
fmt_currency_input.set_locked(False)
fmt_percent.set_locked(False)
fmt_int_input.set_locked(False)
print(f"File '{filename}' generated successfully.")
writer.close()

100
file tools/list_s3_files.py Normal file
View file

@ -0,0 +1,100 @@
import argparse
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import logging
from urllib.parse import quote
# --- Configuration ---
# Set up logging for clear, informative output.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_s3_client():
"""
Initializes and returns a Boto3 S3 client.
Handles credential errors gracefully.
"""
try:
s3_client = boto3.client('s3')
# A quick check to ensure credentials are valid by listing buckets
s3_client.list_buckets()
return s3_client
except NoCredentialsError:
logging.error("AWS credentials not found. Please run 'aws configure' or set up environment variables.")
return None
except ClientError as e:
logging.error(f"An AWS client error occurred: {e}")
return None
def get_bucket_region(s3_client, bucket_name):
"""
Retrieves the AWS region where the S3 bucket is located.
"""
try:
response = s3_client.get_bucket_location(Bucket=bucket_name)
# For us-east-1, the LocationConstraint is None. For other regions, it's the region string.
region = response.get('LocationConstraint')
return region if region is not None else 'us-east-1'
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
logging.error(f"The bucket '{bucket_name}' does not exist.")
else:
logging.error(f"Could not get location for bucket '{bucket_name}': {e}")
return None
def list_files_and_generate_urls(s3_client, bucket_name):
"""
Lists all files in an S3 bucket and prints their public URLs.
"""
logging.info(f"Fetching region for bucket '{bucket_name}'...")
region = get_bucket_region(s3_client, bucket_name)
if not region:
logging.error("Aborting due to failure in retrieving bucket region.")
return
logging.info(f"Bucket is in region: {region}")
logging.info("Listing files and generating URLs...")
# Construct the base URL. The format is: https://<bucket-name>.s3.<region>.amazonaws.com/<key>
base_url = f"https://{bucket_name}.s3.{region}.amazonaws.com/"
file_count = 0
try:
# Use a paginator to handle buckets with more than 1000 objects automatically.
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
if "Contents" in page:
for obj in page['Contents']:
# The object key is the 'filename' in the S3 bucket.
object_key = obj['Key']
# URL-encode the key to handle special characters like spaces, etc.
encoded_key = quote(object_key)
file_url = f"{base_url}{encoded_key}"
print(f"File: {object_key}\nURL: {file_url}\n")
file_count += 1
else:
# This handles the case of an empty bucket
pass
logging.info("="*30)
if file_count == 0:
logging.info(f"The bucket '{bucket_name}' is empty.")
else:
logging.info(f"Found {file_count} file(s) in '{bucket_name}'.")
logging.info("="*30)
except ClientError as e:
logging.error(f"An error occurred while listing files: {e}")
if __name__ == "__main__":
# --- Command-Line Argument Parsing ---
parser = argparse.ArgumentParser(description="List files in an S3 bucket and generate their public URLs.")
parser.add_argument("bucket_name", help="The name of the S3 bucket.")
args = parser.parse_args()
s3 = get_s3_client()
if s3:
list_files_and_generate_urls(s3, args.bucket_name)

295
file tools/s3_gui_tool.py Normal file
View file

@ -0,0 +1,295 @@
import os
import argparse
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import logging
from datetime import datetime, timezone
from urllib.parse import quote
import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext, ttk
import threading
import queue
# --- S3 Logic (Adapted from previous scripts) ---
# Note: Logging is now redirected to the GUI's text area.
class QueueHandler(logging.Handler):
"""A custom logging handler that puts messages into a queue."""
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
self.log_queue.put(self.format(record))
def get_s3_client(log_queue):
"""Initializes and returns a Boto3 S3 client."""
try:
s3_client = boto3.client('s3')
s3_client.list_buckets()
return s3_client
except NoCredentialsError:
log_queue.put("ERROR: AWS credentials not found. Please configure them.")
return None
except ClientError as e:
log_queue.put(f"ERROR: An AWS client error occurred: {e}")
return None
def get_bucket_region(s3_client, bucket_name, log_queue):
"""Retrieves the AWS region for a bucket."""
try:
response = s3_client.get_bucket_location(Bucket=bucket_name)
region = response.get('LocationConstraint')
return region if region is not None else 'us-east-1'
except ClientError as e:
log_queue.put(f"ERROR: Could not get bucket region: {e}")
return None
def sync_folder_to_s3(local_folder, bucket_name, delete_extra_files, log_queue):
"""Syncs a local folder to an S3 bucket."""
s3_client = get_s3_client(log_queue)
if not s3_client: return
if not os.path.isdir(local_folder):
log_queue.put(f"ERROR: Local directory not found: {local_folder}")
return
log_queue.put(f"Starting sync from '{local_folder}' to S3 bucket '{bucket_name}'...")
try:
paginator = s3_client.get_paginator('list_objects_v2')
s3_objects = {obj['Key']: obj['LastModified'] for page in paginator.paginate(Bucket=bucket_name) if "Contents" in page for obj in page['Contents']}
except ClientError as e:
log_queue.put(f"ERROR: Could not list S3 objects: {e}")
return
local_files, upload_count, skip_count = set(), 0, 0
for root, _, files in os.walk(local_folder):
for filename in files:
local_path = os.path.join(root, filename)
s3_key = os.path.relpath(local_path, local_folder).replace(os.path.sep, '/')
local_files.add(s3_key)
local_mtime = datetime.fromtimestamp(os.path.getmtime(local_path), tz=timezone.utc)
if s3_key not in s3_objects or local_mtime > s3_objects[s3_key]:
try:
log_queue.put(f"Uploading: {s3_key}")
s3_client.upload_file(local_path, bucket_name, s3_key)
upload_count += 1
except ClientError as e:
log_queue.put(f"ERROR: Failed to upload {s3_key}: {e}")
else:
skip_count += 1
delete_count = 0
if delete_extra_files:
log_queue.put("Checking for files to delete from S3...")
to_delete = [{'Key': key} for key in s3_objects if key not in local_files]
if to_delete:
for i in range(0, len(to_delete), 1000):
chunk = to_delete[i:i + 1000]
log_queue.put(f"Deleting {len(chunk)} files from S3...")
s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': chunk})
delete_count += len(chunk)
log_queue.put("="*30 + "\nSync Summary\n" + "="*30)
log_queue.put(f" - Uploaded: {upload_count} file(s)")
log_queue.put(f" - Skipped: {skip_count} file(s) (up-to-date)")
if delete_extra_files:
log_queue.put(f" - Deleted: {delete_count} file(s) from S3")
log_queue.put("Sync complete.")
def list_s3_buckets(log_queue=None):
"""Lists all available S3 buckets."""
try:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
buckets = [bucket['Name'] for bucket in response.get('Buckets', [])]
return buckets
except NoCredentialsError:
if log_queue:
log_queue.put("ERROR: AWS credentials not found. Please configure them.")
return []
except ClientError as e:
if log_queue:
log_queue.put(f"ERROR: Could not list buckets: {e}")
return []
def list_files_and_generate_urls(bucket_name, log_queue):
"""Lists files in an S3 bucket and generates their URLs."""
s3_client = get_s3_client(log_queue)
if not s3_client: return
region = get_bucket_region(s3_client, bucket_name, log_queue)
if not region: return
log_queue.put(f"Bucket is in region: {region}")
log_queue.put("Listing files and generating URLs...")
base_url = f"https://{bucket_name}.s3.{region}.amazonaws.com/"
file_count = 0
try:
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if "Contents" in page:
for obj in page['Contents']:
encoded_key = quote(obj['Key'])
file_url = f"{base_url}{encoded_key}"
log_queue.put(f"File: {obj['Key']}\nURL: {file_url}\n")
file_count += 1
log_queue.put("="*30)
log_queue.put(f"Found {file_count} file(s) in '{bucket_name}'.")
except ClientError as e:
log_queue.put(f"ERROR: An error occurred: {e}")
# --- GUI Application ---
class S3ToolApp:
def __init__(self, root):
self.root = root
self.root.title("S3 Sync & Lister Tool")
self.root.geometry("750x600")
self.log_queue = queue.Queue()
# --- UI Frames ---
control_frame = tk.Frame(root, padx=10, pady=10)
control_frame.pack(fill='x')
sync_frame = tk.LabelFrame(control_frame, text="Sync Local Folder to S3", padx=10, pady=10)
sync_frame.pack(fill='x', expand=True, side='left', padx=(0, 5))
list_frame = tk.LabelFrame(control_frame, text="List S3 Bucket Files", padx=10, pady=10)
list_frame.pack(fill='x', expand=True, side='right', padx=(5, 0))
output_frame = tk.Frame(root, padx=10, pady=10)
output_frame.pack(fill='both', expand=True)
# --- Common Widgets ---
bucket_label_frame = tk.Frame(sync_frame)
bucket_label_frame.pack(fill='x', pady=(0, 5))
tk.Label(bucket_label_frame, text="S3 Bucket Name:").pack(side='left')
self.refresh_button = tk.Button(bucket_label_frame, text="Refresh Buckets", command=self.refresh_buckets)
self.refresh_button.pack(side='right')
self.bucket_combobox = ttk.Combobox(sync_frame, width=37, state='readonly')
self.bucket_combobox.pack(fill='x', pady=(0, 10))
# Load buckets on startup
self.refresh_buckets()
# --- Sync Widgets ---
folder_frame = tk.Frame(sync_frame)
folder_frame.pack(fill='x', pady=(0, 10))
tk.Label(folder_frame, text="Local Folder:").pack(side='left')
self.folder_path = tk.StringVar()
tk.Entry(folder_frame, textvariable=self.folder_path, width=30).pack(side='left', fill='x', expand=True)
tk.Button(folder_frame, text="Browse...", command=self.browse_folder).pack(side='right')
self.delete_var = tk.BooleanVar()
tk.Checkbutton(sync_frame, text="Delete files in S3 not present locally", variable=self.delete_var).pack(anchor='w')
self.sync_button = tk.Button(sync_frame, text="Sync to S3", command=self.start_sync_thread)
self.sync_button.pack(pady=(10,0))
# --- List Widgets ---
tk.Label(list_frame, text="Select bucket from dropdown in left panel.").pack(pady=(15,0))
self.list_button = tk.Button(list_frame, text="List Files & Generate URLs", command=self.start_list_thread)
self.list_button.pack(pady=10)
# --- Output Text Area ---
self.log_text = scrolledtext.ScrolledText(output_frame, state='disabled', wrap=tk.WORD, height=20)
self.log_text.pack(fill='both', expand=True)
self.root.after(100, self.process_queue)
def browse_folder(self):
folder = filedialog.askdirectory()
if folder:
self.folder_path.set(folder)
def refresh_buckets(self):
"""Refresh the list of available S3 buckets."""
self.bucket_combobox.config(state='normal')
self.bucket_combobox.set('Loading buckets...')
self.bucket_combobox.config(state='disabled')
self.refresh_button.config(state='disabled')
def load_buckets():
buckets = list_s3_buckets(self.log_queue)
self.root.after(0, lambda: self.update_bucket_list(buckets))
thread = threading.Thread(target=load_buckets)
thread.daemon = True
thread.start()
def update_bucket_list(self, buckets):
"""Update the combobox with the list of buckets."""
self.bucket_combobox.config(state='normal')
if buckets:
self.bucket_combobox['values'] = buckets
self.bucket_combobox.set('')
self.bucket_combobox.config(state='readonly')
else:
self.bucket_combobox.set('No buckets found or error')
self.bucket_combobox['values'] = []
self.bucket_combobox.config(state='disabled')
self.refresh_button.config(state='normal')
def start_sync_thread(self):
local_folder = self.folder_path.get()
bucket_name = self.bucket_combobox.get()
if not local_folder or not bucket_name:
messagebox.showerror("Error", "Please provide both a local folder and a bucket name.")
return
self.toggle_buttons(False)
thread = threading.Thread(target=self.run_task, args=(sync_folder_to_s3, local_folder, bucket_name, self.delete_var.get(), self.log_queue))
thread.daemon = True
thread.start()
def start_list_thread(self):
bucket_name = self.bucket_combobox.get()
if not bucket_name:
messagebox.showerror("Error", "Please select a bucket name from the dropdown.")
return
self.toggle_buttons(False)
thread = threading.Thread(target=self.run_task, args=(list_files_and_generate_urls, bucket_name, self.log_queue))
thread.daemon = True
thread.start()
def run_task(self, task_func, *args):
self.clear_log()
try:
task_func(*args)
finally:
self.log_queue.put("TASK_COMPLETE") # Signal to re-enable buttons
def toggle_buttons(self, enabled):
state = 'normal' if enabled else 'disabled'
self.sync_button.config(state=state)
self.list_button.config(state=state)
def process_queue(self):
try:
while True:
msg = self.log_queue.get_nowait()
if msg == "TASK_COMPLETE":
self.toggle_buttons(True)
else:
self.log_text.config(state='normal')
self.log_text.insert(tk.END, msg + '\n')
self.log_text.config(state='disabled')
self.log_text.see(tk.END)
except queue.Empty:
pass
self.root.after(100, self.process_queue)
def clear_log(self):
self.log_text.config(state='normal')
self.log_text.delete(1.0, tk.END)
self.log_text.config(state='disabled')
if __name__ == "__main__":
root = tk.Tk()
app = S3ToolApp(root)
root.mainloop()

156
file tools/s3_sync.py Normal file
View file

@ -0,0 +1,156 @@
import os
import argparse
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
import logging
from datetime import datetime, timezone
# --- Configuration ---
# Set up logging to provide clear output
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_s3_client():
"""
Initializes and returns a Boto3 S3 client.
Handles credential errors gracefully.
"""
try:
# Boto3 will automatically look for credentials in the standard locations:
# 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
# 2. The ~/.aws/credentials file
s3_client = boto3.client('s3')
# A quick check to ensure credentials are valid
s3_client.list_buckets()
return s3_client
except NoCredentialsError:
logging.error("AWS credentials not found. Please configure them using 'aws configure' or environment variables.")
return None
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidAccessKeyId':
logging.error("Invalid AWS Access Key ID. Please check your credentials.")
else:
logging.error(f"An AWS client error occurred: {e}")
return None
except Exception as e:
logging.error(f"An unexpected error occurred during S3 client initialization: {e}")
return None
def get_s3_objects(s3_client, bucket_name):
"""
Fetches all objects in the S3 bucket and returns a dictionary
mapping object keys to their last modified timestamps.
"""
s3_objects = {}
try:
# Use a paginator to handle buckets with more than 1000 objects
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
if "Contents" in page:
for obj in page['Contents']:
s3_objects[obj['Key']] = obj['LastModified']
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
logging.error(f"The bucket '{bucket_name}' does not exist.")
else:
logging.error(f"Could not list objects in bucket '{bucket_name}': {e}")
return None
return s3_objects
def sync_folder_to_s3(s3_client, local_folder, bucket_name, delete_extra_files):
"""
Syncs the contents of a local folder to an S3 bucket.
"""
if not os.path.isdir(local_folder):
logging.error(f"Local directory not found: {local_folder}")
return
logging.info(f"Starting sync from '{local_folder}' to S3 bucket '{bucket_name}'...")
s3_objects = get_s3_objects(s3_client, bucket_name)
if s3_objects is None:
logging.error("Aborting sync due to S3 error.")
return
local_files = set()
upload_count = 0
skip_count = 0
# --- Step 1: Walk local directory and upload new/modified files ---
for root, _, files in os.walk(local_folder):
for filename in files:
local_path = os.path.join(root, filename)
# Create the relative path to use as the S3 object key
relative_path = os.path.relpath(local_path, local_folder)
# S3 uses forward slashes, so convert for cross-platform compatibility
s3_key = relative_path.replace(os.path.sep, '/')
local_files.add(s3_key)
local_mtime_dt = datetime.fromtimestamp(os.path.getmtime(local_path), tz=timezone.utc)
# Check if file needs to be uploaded
if s3_key not in s3_objects or local_mtime_dt > s3_objects[s3_key]:
try:
logging.info(f"Uploading: {s3_key}")
s3_client.upload_file(local_path, bucket_name, s3_key)
upload_count += 1
except ClientError as e:
logging.error(f"Failed to upload {local_path}: {e}")
else:
logging.debug(f"Skipping (unchanged): {s3_key}")
skip_count += 1
logging.info("Local file scan complete.")
# --- Step 2: Delete files from S3 that are not present locally (if enabled) ---
delete_count = 0
if delete_extra_files:
logging.info("Checking for files to delete from S3...")
s3_keys_to_delete = [
{'Key': key} for key in s3_objects if key not in local_files
]
if s3_keys_to_delete:
# S3 delete_objects can handle up to 1000 keys at a time
for i in range(0, len(s3_keys_to_delete), 1000):
chunk = s3_keys_to_delete[i:i + 1000]
try:
logging.info(f"Deleting {len(chunk)} files from S3...")
s3_client.delete_objects(
Bucket=bucket_name,
Delete={'Objects': chunk}
)
delete_count += len(chunk)
except ClientError as e:
logging.error(f"Failed to delete objects from S3: {e}")
else:
logging.info("No files to delete from S3.")
# --- Final Summary ---
logging.info("="*30)
logging.info("Sync Summary")
logging.info(f" - Uploaded: {upload_count} files")
logging.info(f" - Skipped: {skip_count} files (up-to-date)")
if delete_extra_files:
logging.info(f" - Deleted: {delete_count} files from S3")
logging.info("Sync complete.")
logging.info("="*30)
if __name__ == "__main__":
# --- Command-Line Argument Parsing ---
parser = argparse.ArgumentParser(description="Sync a local folder to an Amazon S3 bucket.")
parser.add_argument("local_folder", help="The local folder to sync.")
parser.add_argument("bucket_name", help="The name of the S3 bucket.")
parser.add_argument(
"--delete",
action="store_true",
help="Delete files from the S3 bucket that do not exist in the local folder."
)
args = parser.parse_args()
s3_client = get_s3_client()
if s3_client:
sync_folder_to_s3(s3_client, args.local_folder, args.bucket_name, args.delete)